1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15xDS Test Client.
16
17TODO(sergiitk): separate XdsTestClient and KubernetesClientRunner to individual
18modules.
19"""
20import datetime
21import functools
22import logging
23from typing import Iterator, Optional
24
25from framework.helpers import retryers
26from framework.infrastructure import k8s
27import framework.rpc
28from framework.rpc import grpc_channelz
29from framework.rpc import grpc_testing
30from framework.test_app import base_runner
31
32logger = logging.getLogger(__name__)
33
34# Type aliases
35_timedelta = datetime.timedelta
36_LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient
37_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
38_ChannelzChannel = grpc_channelz.Channel
39_ChannelzChannelState = grpc_channelz.ChannelState
40_ChannelzSubchannel = grpc_channelz.Subchannel
41_ChannelzSocket = grpc_channelz.Socket
42
43
44class XdsTestClient(framework.rpc.grpc.GrpcApp):
45    """
46    Represents RPC services implemented in Client component of the xds test app.
47    https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#client
48    """
49
50    def __init__(self,
51                 *,
52                 ip: str,
53                 rpc_port: int,
54                 server_target: str,
55                 rpc_host: Optional[str] = None,
56                 maintenance_port: Optional[int] = None):
57        super().__init__(rpc_host=(rpc_host or ip))
58        self.ip = ip
59        self.rpc_port = rpc_port
60        self.server_target = server_target
61        self.maintenance_port = maintenance_port or rpc_port
62
63    @property
64    @functools.lru_cache(None)
65    def load_balancer_stats(self) -> _LoadBalancerStatsServiceClient:
66        return _LoadBalancerStatsServiceClient(self._make_channel(
67            self.rpc_port))
68
69    @property
70    @functools.lru_cache(None)
71    def channelz(self) -> _ChannelzServiceClient:
72        return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
73
74    def get_load_balancer_stats(
75            self,
76            *,
77            num_rpcs: int,
78            timeout_sec: Optional[int] = None,
79    ) -> grpc_testing.LoadBalancerStatsResponse:
80        """
81        Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
82        """
83        return self.load_balancer_stats.get_client_stats(
84            num_rpcs=num_rpcs, timeout_sec=timeout_sec)
85
86    def get_server_channels(self) -> Iterator[_ChannelzChannel]:
87        return self.channelz.find_channels_for_target(self.server_target)
88
89    def wait_for_active_server_channel(self) -> _ChannelzChannel:
90        """Wait for the channel to the server to transition to READY.
91
92        Raises:
93            GrpcApp.NotFound: If the channel never transitioned to READY.
94        """
95        return self.wait_for_server_channel_state(_ChannelzChannelState.READY)
96
97    def get_active_server_channel(self) -> _ChannelzChannel:
98        """Return a READY channel to the server.
99
100        Raises:
101            GrpcApp.NotFound: If there's no READY channel to the server.
102        """
103        return self.find_server_channel_with_state(_ChannelzChannelState.READY)
104
105    def get_active_server_channel_socket(self) -> _ChannelzSocket:
106        channel = self.get_active_server_channel()
107        # Get the first subchannel of the active channel to the server.
108        logger.debug(
109            'Retrieving client -> server socket, '
110            'channel_id: %s, subchannel: %s', channel.ref.channel_id,
111            channel.subchannel_ref[0].name)
112        subchannel, *subchannels = list(
113            self.channelz.list_channel_subchannels(channel))
114        if subchannels:
115            logger.warning('Unexpected subchannels: %r', subchannels)
116        # Get the first socket of the subchannel
117        socket, *sockets = list(
118            self.channelz.list_subchannels_sockets(subchannel))
119        if sockets:
120            logger.warning('Unexpected sockets: %r', subchannels)
121        logger.debug('Found client -> server socket: %s', socket.ref.name)
122        return socket
123
124    def wait_for_server_channel_state(self,
125                                      state: _ChannelzChannelState,
126                                      *,
127                                      timeout: Optional[_timedelta] = None
128                                     ) -> _ChannelzChannel:
129        # Fine-tuned to wait for the channel to the server.
130        retryer = retryers.exponential_retryer_with_timeout(
131            wait_min=_timedelta(seconds=10),
132            wait_max=_timedelta(seconds=25),
133            timeout=_timedelta(minutes=3) if timeout is None else timeout)
134
135        logger.info('Waiting for client %s to report a %s channel to %s',
136                    self.ip, _ChannelzChannelState.Name(state),
137                    self.server_target)
138        channel = retryer(self.find_server_channel_with_state, state)
139        logger.info('Client %s channel to %s transitioned to state %s:\n%s',
140                    self.ip, self.server_target,
141                    _ChannelzChannelState.Name(state), channel)
142        return channel
143
144    def find_server_channel_with_state(self,
145                                       state: _ChannelzChannelState,
146                                       *,
147                                       check_subchannel=True
148                                      ) -> _ChannelzChannel:
149        for channel in self.get_server_channels():
150            channel_state: _ChannelzChannelState = channel.data.state.state
151            logger.info('Server channel: %s, state: %s', channel.ref.name,
152                        _ChannelzChannelState.Name(channel_state))
153            if channel_state is state:
154                if check_subchannel:
155                    # When requested, check if the channel has at least
156                    # one subchannel in the requested state.
157                    try:
158                        subchannel = self.find_subchannel_with_state(
159                            channel, state)
160                        logger.info('Found subchannel in state %s: %s', state,
161                                    subchannel)
162                    except self.NotFound as e:
163                        # Otherwise, keep searching.
164                        logger.info(e.message)
165                        continue
166                return channel
167
168        raise self.NotFound(
169            f'Client has no {_ChannelzChannelState.Name(state)} channel with '
170            'the server')
171
172    def find_subchannel_with_state(self, channel: _ChannelzChannel,
173                                   state: _ChannelzChannelState
174                                  ) -> _ChannelzSubchannel:
175        for subchannel in self.channelz.list_channel_subchannels(channel):
176            if subchannel.data.state.state is state:
177                return subchannel
178
179        raise self.NotFound(
180            f'Not found a {_ChannelzChannelState.Name(state)} '
181            f'subchannel for channel_id {channel.ref.channel_id}')
182
183
184class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
185
186    def __init__(self,
187                 k8s_namespace,
188                 *,
189                 deployment_name,
190                 image_name,
191                 gcp_service_account,
192                 td_bootstrap_image,
193                 service_account_name=None,
194                 stats_port=8079,
195                 network='default',
196                 deployment_template='client.deployment.yaml',
197                 service_account_template='service-account.yaml',
198                 reuse_namespace=False,
199                 namespace_template=None,
200                 debug_use_port_forwarding=False):
201        super().__init__(k8s_namespace, namespace_template, reuse_namespace)
202
203        # Settings
204        self.deployment_name = deployment_name
205        self.image_name = image_name
206        self.gcp_service_account = gcp_service_account
207        self.service_account_name = service_account_name or deployment_name
208        self.stats_port = stats_port
209        # xDS bootstrap generator
210        self.td_bootstrap_image = td_bootstrap_image
211        self.network = network
212        self.deployment_template = deployment_template
213        self.service_account_template = service_account_template
214        self.debug_use_port_forwarding = debug_use_port_forwarding
215
216        # Mutable state
217        self.deployment: Optional[k8s.V1Deployment] = None
218        self.service_account: Optional[k8s.V1ServiceAccount] = None
219        self.port_forwarder = None
220
221    def run(self,
222            *,
223            server_target,
224            rpc='UnaryCall',
225            qps=25,
226            secure_mode=False,
227            print_response=False) -> XdsTestClient:
228        super().run()
229        # TODO(sergiitk): make rpc UnaryCall enum or get it from proto
230
231        # Create service account
232        self.service_account = self._create_service_account(
233            self.service_account_template,
234            service_account_name=self.service_account_name,
235            namespace_name=self.k8s_namespace.name,
236            gcp_service_account=self.gcp_service_account)
237
238        # Always create a new deployment
239        self.deployment = self._create_deployment(
240            self.deployment_template,
241            deployment_name=self.deployment_name,
242            image_name=self.image_name,
243            namespace_name=self.k8s_namespace.name,
244            service_account_name=self.service_account_name,
245            td_bootstrap_image=self.td_bootstrap_image,
246            network_name=self.network,
247            stats_port=self.stats_port,
248            server_target=server_target,
249            rpc=rpc,
250            qps=qps,
251            secure_mode=secure_mode,
252            print_response=print_response)
253
254        self._wait_deployment_with_available_replicas(self.deployment_name)
255
256        # Load test client pod. We need only one client at the moment
257        pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
258        self._wait_pod_started(pod.metadata.name)
259        pod_ip = pod.status.pod_ip
260        rpc_host = None
261
262        # Experimental, for local debugging.
263        if self.debug_use_port_forwarding:
264            logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
265                        pod_ip, self.stats_port)
266            self.port_forwarder = self.k8s_namespace.port_forward_pod(
267                pod, remote_port=self.stats_port)
268            rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
269
270        return XdsTestClient(ip=pod_ip,
271                             rpc_port=self.stats_port,
272                             server_target=server_target,
273                             rpc_host=rpc_host)
274
275    def cleanup(self, *, force=False, force_namespace=False):
276        if self.port_forwarder:
277            self.k8s_namespace.port_forward_stop(self.port_forwarder)
278            self.port_forwarder = None
279        if self.deployment or force:
280            self._delete_deployment(self.deployment_name)
281            self.deployment = None
282        if self.service_account or force:
283            self._delete_service_account(self.service_account_name)
284            self.service_account = None
285        super().cleanup(force=force_namespace and force)
286