1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15xDS Test Server. 16 17TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual 18modules. 19""" 20import functools 21import logging 22from typing import Iterator, Optional 23 24from framework.infrastructure import k8s 25import framework.rpc 26from framework.rpc import grpc_channelz 27from framework.test_app import base_runner 28 29logger = logging.getLogger(__name__) 30 31# Type aliases 32_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient 33 34 35class XdsTestServer(framework.rpc.grpc.GrpcApp): 36 """ 37 Represents RPC services implemented in Server component of the xDS test app. 38 https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server 39 """ 40 41 def __init__(self, 42 *, 43 ip: str, 44 rpc_port: int, 45 maintenance_port: Optional[int] = None, 46 secure_mode: Optional[bool] = False, 47 server_id: Optional[str] = None, 48 xds_host: Optional[str] = None, 49 xds_port: Optional[int] = None, 50 rpc_host: Optional[str] = None): 51 super().__init__(rpc_host=(rpc_host or ip)) 52 self.ip = ip 53 self.rpc_port = rpc_port 54 self.maintenance_port = maintenance_port or rpc_port 55 self.secure_mode = secure_mode 56 self.server_id = server_id 57 self.xds_host, self.xds_port = xds_host, xds_port 58 59 @property 60 @functools.lru_cache(None) 61 def channelz(self) -> _ChannelzServiceClient: 62 return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) 63 64 def set_xds_address(self, xds_host, xds_port: Optional[int] = None): 65 self.xds_host, self.xds_port = xds_host, xds_port 66 67 @property 68 def xds_address(self) -> str: 69 if not self.xds_host: 70 return '' 71 if not self.xds_port: 72 return self.xds_host 73 return f'{self.xds_host}:{self.xds_port}' 74 75 @property 76 def xds_uri(self) -> str: 77 if not self.xds_host: 78 return '' 79 return f'xds:///{self.xds_address}' 80 81 def get_test_server(self) -> grpc_channelz.Server: 82 """Return channelz representation of a server running TestService. 83 84 Raises: 85 GrpcApp.NotFound: Test server not found. 86 """ 87 server = self.channelz.find_server_listening_on_port(self.rpc_port) 88 if not server: 89 raise self.NotFound( 90 f'Server listening on port {self.rpc_port} not found') 91 return server 92 93 def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]: 94 """List all sockets of the test server. 95 96 Raises: 97 GrpcApp.NotFound: Test server not found. 98 """ 99 server = self.get_test_server() 100 return self.channelz.list_server_sockets(server) 101 102 def get_server_socket_matching_client(self, 103 client_socket: grpc_channelz.Socket): 104 """Find test server socket that matches given test client socket. 105 106 Sockets are matched using TCP endpoints (ip:port), further on "address". 107 Server socket remote address matched with client socket local address. 108 109 Raises: 110 GrpcApp.NotFound: Server socket matching client socket not found. 111 """ 112 client_local = self.channelz.sock_address_to_str(client_socket.local) 113 logger.debug('Looking for a server socket connected to the client %s', 114 client_local) 115 116 server_socket = self.channelz.find_server_socket_matching_client( 117 self.get_test_server_sockets(), client_socket) 118 if not server_socket: 119 raise self.NotFound( 120 f'Server socket to client {client_local} not found') 121 122 logger.info('Found matching socket pair: server(%s) <-> client(%s)', 123 self.channelz.sock_addresses_pretty(server_socket), 124 self.channelz.sock_addresses_pretty(client_socket)) 125 return server_socket 126 127 128class KubernetesServerRunner(base_runner.KubernetesBaseRunner): 129 130 def __init__(self, 131 k8s_namespace, 132 *, 133 deployment_name, 134 image_name, 135 gcp_service_account, 136 service_account_name=None, 137 service_name=None, 138 neg_name=None, 139 td_bootstrap_image=None, 140 network='default', 141 deployment_template='server.deployment.yaml', 142 service_account_template='service-account.yaml', 143 service_template='server.service.yaml', 144 reuse_service=False, 145 reuse_namespace=False, 146 namespace_template=None, 147 debug_use_port_forwarding=False): 148 super().__init__(k8s_namespace, namespace_template, reuse_namespace) 149 150 # Settings 151 self.deployment_name = deployment_name 152 self.image_name = image_name 153 self.gcp_service_account = gcp_service_account 154 self.service_account_name = service_account_name or deployment_name 155 self.service_name = service_name or deployment_name 156 # xDS bootstrap generator 157 self.td_bootstrap_image = td_bootstrap_image 158 # This only works in k8s >= 1.18.10-gke.600 159 # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs 160 self.neg_name = neg_name or (f'{self.k8s_namespace.name}-' 161 f'{self.service_name}') 162 self.network = network 163 self.deployment_template = deployment_template 164 self.service_account_template = service_account_template 165 self.service_template = service_template 166 self.reuse_service = reuse_service 167 self.debug_use_port_forwarding = debug_use_port_forwarding 168 169 # Mutable state 170 self.deployment: Optional[k8s.V1Deployment] = None 171 self.service_account: Optional[k8s.V1ServiceAccount] = None 172 self.service: Optional[k8s.V1Service] = None 173 self.port_forwarder = None 174 175 def run(self, 176 *, 177 test_port=8080, 178 maintenance_port=None, 179 secure_mode=False, 180 server_id=None, 181 replica_count=1) -> XdsTestServer: 182 # TODO(sergiitk): multiple replicas 183 if replica_count != 1: 184 raise NotImplementedError("Multiple replicas not yet supported") 185 186 # Implementation detail: in secure mode, maintenance ("backchannel") 187 # port must be different from the test port so communication with 188 # maintenance services can be reached independently from the security 189 # configuration under test. 190 if maintenance_port is None: 191 maintenance_port = test_port if not secure_mode else test_port + 1 192 if secure_mode and maintenance_port == test_port: 193 raise ValueError('port and maintenance_port must be different ' 194 'when running test server in secure mode') 195 # To avoid bugs with comparing wrong types. 196 if not (isinstance(test_port, int) and 197 isinstance(maintenance_port, int)): 198 raise TypeError('Port numbers must be integer') 199 200 # Create namespace. 201 super().run() 202 203 # Reuse existing if requested, create a new deployment when missing. 204 # Useful for debugging to avoid NEG loosing relation to deleted service. 205 if self.reuse_service: 206 self.service = self._reuse_service(self.service_name) 207 if not self.service: 208 self.service = self._create_service( 209 self.service_template, 210 service_name=self.service_name, 211 namespace_name=self.k8s_namespace.name, 212 deployment_name=self.deployment_name, 213 neg_name=self.neg_name, 214 test_port=test_port) 215 self._wait_service_neg(self.service_name, test_port) 216 217 # Create service account 218 self.service_account = self._create_service_account( 219 self.service_account_template, 220 service_account_name=self.service_account_name, 221 namespace_name=self.k8s_namespace.name, 222 gcp_service_account=self.gcp_service_account) 223 224 # Always create a new deployment 225 self.deployment = self._create_deployment( 226 self.deployment_template, 227 deployment_name=self.deployment_name, 228 image_name=self.image_name, 229 namespace_name=self.k8s_namespace.name, 230 service_account_name=self.service_account_name, 231 td_bootstrap_image=self.td_bootstrap_image, 232 network_name=self.network, 233 replica_count=replica_count, 234 test_port=test_port, 235 maintenance_port=maintenance_port, 236 server_id=server_id, 237 secure_mode=secure_mode) 238 239 self._wait_deployment_with_available_replicas(self.deployment_name, 240 replica_count, 241 timeout_sec=120) 242 243 # Wait for pods running 244 pods = self.k8s_namespace.list_deployment_pods(self.deployment) 245 for pod in pods: 246 self._wait_pod_started(pod.metadata.name) 247 248 # TODO(sergiitk): This is why multiple replicas not yet supported 249 pod = pods[0] 250 pod_ip = pod.status.pod_ip 251 rpc_host = None 252 # Experimental, for local debugging. 253 if self.debug_use_port_forwarding: 254 logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', 255 pod_ip, maintenance_port) 256 self.port_forwarder = self.k8s_namespace.port_forward_pod( 257 pod, remote_port=maintenance_port) 258 rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS 259 260 return XdsTestServer(ip=pod_ip, 261 rpc_port=test_port, 262 maintenance_port=maintenance_port, 263 secure_mode=secure_mode, 264 server_id=server_id, 265 rpc_host=rpc_host) 266 267 def cleanup(self, *, force=False, force_namespace=False): 268 if self.port_forwarder: 269 self.k8s_namespace.port_forward_stop(self.port_forwarder) 270 self.port_forwarder = None 271 if self.deployment or force: 272 self._delete_deployment(self.deployment_name) 273 self.deployment = None 274 if (self.service and not self.reuse_service) or force: 275 self._delete_service(self.service_name) 276 self.service = None 277 if self.service_account or force: 278 self._delete_service_account(self.service_account_name) 279 self.service_account = None 280 super().cleanup(force=(force_namespace and force)) 281