1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15xDS Test Server.
16
17TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual
18modules.
19"""
20import functools
21import logging
22from typing import Iterator, Optional
23
24from framework.infrastructure import k8s
25import framework.rpc
26from framework.rpc import grpc_channelz
27from framework.test_app import base_runner
28
29logger = logging.getLogger(__name__)
30
31# Type aliases
32_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
33
34
35class XdsTestServer(framework.rpc.grpc.GrpcApp):
36    """
37    Represents RPC services implemented in Server component of the xDS test app.
38    https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
39    """
40
41    def __init__(self,
42                 *,
43                 ip: str,
44                 rpc_port: int,
45                 maintenance_port: Optional[int] = None,
46                 secure_mode: Optional[bool] = False,
47                 server_id: Optional[str] = None,
48                 xds_host: Optional[str] = None,
49                 xds_port: Optional[int] = None,
50                 rpc_host: Optional[str] = None):
51        super().__init__(rpc_host=(rpc_host or ip))
52        self.ip = ip
53        self.rpc_port = rpc_port
54        self.maintenance_port = maintenance_port or rpc_port
55        self.secure_mode = secure_mode
56        self.server_id = server_id
57        self.xds_host, self.xds_port = xds_host, xds_port
58
59    @property
60    @functools.lru_cache(None)
61    def channelz(self) -> _ChannelzServiceClient:
62        return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
63
64    def set_xds_address(self, xds_host, xds_port: Optional[int] = None):
65        self.xds_host, self.xds_port = xds_host, xds_port
66
67    @property
68    def xds_address(self) -> str:
69        if not self.xds_host:
70            return ''
71        if not self.xds_port:
72            return self.xds_host
73        return f'{self.xds_host}:{self.xds_port}'
74
75    @property
76    def xds_uri(self) -> str:
77        if not self.xds_host:
78            return ''
79        return f'xds:///{self.xds_address}'
80
81    def get_test_server(self) -> grpc_channelz.Server:
82        """Return channelz representation of a server running TestService.
83
84        Raises:
85            GrpcApp.NotFound: Test server not found.
86        """
87        server = self.channelz.find_server_listening_on_port(self.rpc_port)
88        if not server:
89            raise self.NotFound(
90                f'Server listening on port {self.rpc_port} not found')
91        return server
92
93    def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]:
94        """List all sockets of the test server.
95
96        Raises:
97            GrpcApp.NotFound: Test server not found.
98        """
99        server = self.get_test_server()
100        return self.channelz.list_server_sockets(server)
101
102    def get_server_socket_matching_client(self,
103                                          client_socket: grpc_channelz.Socket):
104        """Find test server socket that matches given test client socket.
105
106        Sockets are matched using TCP endpoints (ip:port), further on "address".
107        Server socket remote address matched with client socket local address.
108
109         Raises:
110             GrpcApp.NotFound: Server socket matching client socket not found.
111         """
112        client_local = self.channelz.sock_address_to_str(client_socket.local)
113        logger.debug('Looking for a server socket connected to the client %s',
114                     client_local)
115
116        server_socket = self.channelz.find_server_socket_matching_client(
117            self.get_test_server_sockets(), client_socket)
118        if not server_socket:
119            raise self.NotFound(
120                f'Server socket to client {client_local} not found')
121
122        logger.info('Found matching socket pair: server(%s) <-> client(%s)',
123                    self.channelz.sock_addresses_pretty(server_socket),
124                    self.channelz.sock_addresses_pretty(client_socket))
125        return server_socket
126
127
128class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
129
130    def __init__(self,
131                 k8s_namespace,
132                 *,
133                 deployment_name,
134                 image_name,
135                 gcp_service_account,
136                 service_account_name=None,
137                 service_name=None,
138                 neg_name=None,
139                 td_bootstrap_image=None,
140                 network='default',
141                 deployment_template='server.deployment.yaml',
142                 service_account_template='service-account.yaml',
143                 service_template='server.service.yaml',
144                 reuse_service=False,
145                 reuse_namespace=False,
146                 namespace_template=None,
147                 debug_use_port_forwarding=False):
148        super().__init__(k8s_namespace, namespace_template, reuse_namespace)
149
150        # Settings
151        self.deployment_name = deployment_name
152        self.image_name = image_name
153        self.gcp_service_account = gcp_service_account
154        self.service_account_name = service_account_name or deployment_name
155        self.service_name = service_name or deployment_name
156        # xDS bootstrap generator
157        self.td_bootstrap_image = td_bootstrap_image
158        # This only works in k8s >= 1.18.10-gke.600
159        # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs
160        self.neg_name = neg_name or (f'{self.k8s_namespace.name}-'
161                                     f'{self.service_name}')
162        self.network = network
163        self.deployment_template = deployment_template
164        self.service_account_template = service_account_template
165        self.service_template = service_template
166        self.reuse_service = reuse_service
167        self.debug_use_port_forwarding = debug_use_port_forwarding
168
169        # Mutable state
170        self.deployment: Optional[k8s.V1Deployment] = None
171        self.service_account: Optional[k8s.V1ServiceAccount] = None
172        self.service: Optional[k8s.V1Service] = None
173        self.port_forwarder = None
174
175    def run(self,
176            *,
177            test_port=8080,
178            maintenance_port=None,
179            secure_mode=False,
180            server_id=None,
181            replica_count=1) -> XdsTestServer:
182        # TODO(sergiitk): multiple replicas
183        if replica_count != 1:
184            raise NotImplementedError("Multiple replicas not yet supported")
185
186        # Implementation detail: in secure mode, maintenance ("backchannel")
187        # port must be different from the test port so communication with
188        # maintenance services can be reached independently from the security
189        # configuration under test.
190        if maintenance_port is None:
191            maintenance_port = test_port if not secure_mode else test_port + 1
192        if secure_mode and maintenance_port == test_port:
193            raise ValueError('port and maintenance_port must be different '
194                             'when running test server in secure mode')
195        # To avoid bugs with comparing wrong types.
196        if not (isinstance(test_port, int) and
197                isinstance(maintenance_port, int)):
198            raise TypeError('Port numbers must be integer')
199
200        # Create namespace.
201        super().run()
202
203        # Reuse existing if requested, create a new deployment when missing.
204        # Useful for debugging to avoid NEG loosing relation to deleted service.
205        if self.reuse_service:
206            self.service = self._reuse_service(self.service_name)
207        if not self.service:
208            self.service = self._create_service(
209                self.service_template,
210                service_name=self.service_name,
211                namespace_name=self.k8s_namespace.name,
212                deployment_name=self.deployment_name,
213                neg_name=self.neg_name,
214                test_port=test_port)
215        self._wait_service_neg(self.service_name, test_port)
216
217        # Create service account
218        self.service_account = self._create_service_account(
219            self.service_account_template,
220            service_account_name=self.service_account_name,
221            namespace_name=self.k8s_namespace.name,
222            gcp_service_account=self.gcp_service_account)
223
224        # Always create a new deployment
225        self.deployment = self._create_deployment(
226            self.deployment_template,
227            deployment_name=self.deployment_name,
228            image_name=self.image_name,
229            namespace_name=self.k8s_namespace.name,
230            service_account_name=self.service_account_name,
231            td_bootstrap_image=self.td_bootstrap_image,
232            network_name=self.network,
233            replica_count=replica_count,
234            test_port=test_port,
235            maintenance_port=maintenance_port,
236            server_id=server_id,
237            secure_mode=secure_mode)
238
239        self._wait_deployment_with_available_replicas(self.deployment_name,
240                                                      replica_count,
241                                                      timeout_sec=120)
242
243        # Wait for pods running
244        pods = self.k8s_namespace.list_deployment_pods(self.deployment)
245        for pod in pods:
246            self._wait_pod_started(pod.metadata.name)
247
248        # TODO(sergiitk): This is why multiple replicas not yet supported
249        pod = pods[0]
250        pod_ip = pod.status.pod_ip
251        rpc_host = None
252        # Experimental, for local debugging.
253        if self.debug_use_port_forwarding:
254            logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
255                        pod_ip, maintenance_port)
256            self.port_forwarder = self.k8s_namespace.port_forward_pod(
257                pod, remote_port=maintenance_port)
258            rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
259
260        return XdsTestServer(ip=pod_ip,
261                             rpc_port=test_port,
262                             maintenance_port=maintenance_port,
263                             secure_mode=secure_mode,
264                             server_id=server_id,
265                             rpc_host=rpc_host)
266
267    def cleanup(self, *, force=False, force_namespace=False):
268        if self.port_forwarder:
269            self.k8s_namespace.port_forward_stop(self.port_forwarder)
270            self.port_forwarder = None
271        if self.deployment or force:
272            self._delete_deployment(self.deployment_name)
273            self.deployment = None
274        if (self.service and not self.reuse_service) or force:
275            self._delete_service(self.service_name)
276            self.service = None
277        if self.service_account or force:
278            self._delete_service_account(self.service_account_name)
279            self.service_account = None
280        super().cleanup(force=(force_namespace and force))
281