1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import contextlib
15import logging
16import pathlib
17from typing import Optional
18
19import mako.template
20import yaml
21
22from framework.infrastructure import k8s
23
24logger = logging.getLogger(__name__)
25
26
27class RunnerError(Exception):
28    """Error running app"""
29
30
31class KubernetesBaseRunner:
32    TEMPLATE_DIR_NAME = 'kubernetes-manifests'
33    TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}'
34
35    def __init__(self,
36                 k8s_namespace,
37                 namespace_template=None,
38                 reuse_namespace=False):
39        # Kubernetes namespaced resources manager
40        self.k8s_namespace: k8s.KubernetesNamespace = k8s_namespace
41        self.reuse_namespace = reuse_namespace
42        self.namespace_template = namespace_template or 'namespace.yaml'
43
44        # Mutable state
45        self.namespace: Optional[k8s.V1Namespace] = None
46
47    def run(self, **kwargs):
48        if self.reuse_namespace:
49            self.namespace = self._reuse_namespace()
50        if not self.namespace:
51            self.namespace = self._create_namespace(
52                self.namespace_template, namespace_name=self.k8s_namespace.name)
53
54    def cleanup(self, *, force=False):
55        if (self.namespace and not self.reuse_namespace) or force:
56            self._delete_namespace()
57            self.namespace = None
58
59    @staticmethod
60    def _render_template(template_file, **kwargs):
61        template = mako.template.Template(filename=str(template_file))
62        return template.render(**kwargs)
63
64    @staticmethod
65    def _manifests_from_yaml_file(yaml_file):
66        with open(yaml_file) as f:
67            with contextlib.closing(yaml.safe_load_all(f)) as yml:
68                for manifest in yml:
69                    yield manifest
70
71    @staticmethod
72    def _manifests_from_str(document):
73        with contextlib.closing(yaml.safe_load_all(document)) as yml:
74            for manifest in yml:
75                yield manifest
76
77    @classmethod
78    def _template_file_from_name(cls, template_name):
79        templates_path = (pathlib.Path(__file__).parent /
80                          cls.TEMPLATE_DIR_RELATIVE_PATH)
81        return templates_path.joinpath(template_name).resolve()
82
83    def _create_from_template(self, template_name, **kwargs):
84        template_file = self._template_file_from_name(template_name)
85        logger.debug("Loading k8s manifest template: %s", template_file)
86
87        yaml_doc = self._render_template(template_file, **kwargs)
88        logger.info("Rendered template %s/%s:\n%s", self.TEMPLATE_DIR_NAME,
89                    template_name, yaml_doc)
90
91        manifests = self._manifests_from_str(yaml_doc)
92        manifest = next(manifests)
93        # Error out on multi-document yaml
94        if next(manifests, False):
95            raise RunnerError('Exactly one document expected in manifest '
96                              f'{template_file}')
97        k8s_objects = self.k8s_namespace.apply_manifest(manifest)
98        if len(k8s_objects) != 1:
99            raise RunnerError('Expected exactly one object must created from '
100                              f'manifest {template_file}')
101
102        logger.info('%s %s created', k8s_objects[0].kind,
103                    k8s_objects[0].metadata.name)
104        return k8s_objects[0]
105
106    def _reuse_deployment(self, deployment_name) -> k8s.V1Deployment:
107        deployment = self.k8s_namespace.get_deployment(deployment_name)
108        # TODO(sergiitk): check if good or must be recreated
109        return deployment
110
111    def _reuse_service(self, service_name) -> k8s.V1Service:
112        service = self.k8s_namespace.get_service(service_name)
113        # TODO(sergiitk): check if good or must be recreated
114        return service
115
116    def _reuse_namespace(self) -> k8s.V1Namespace:
117        return self.k8s_namespace.get()
118
119    def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace:
120        namespace = self._create_from_template(template, **kwargs)
121        if not isinstance(namespace, k8s.V1Namespace):
122            raise RunnerError('Expected V1Namespace to be created '
123                              f'from manifest {template}')
124        if namespace.metadata.name != kwargs['namespace_name']:
125            raise RunnerError('V1Namespace created with unexpected name: '
126                              f'{namespace.metadata.name}')
127        logger.debug('V1Namespace %s created at %s',
128                     namespace.metadata.self_link,
129                     namespace.metadata.creation_timestamp)
130        return namespace
131
132    def _create_service_account(self, template,
133                                **kwargs) -> k8s.V1ServiceAccount:
134        resource = self._create_from_template(template, **kwargs)
135        if not isinstance(resource, k8s.V1ServiceAccount):
136            raise RunnerError('Expected V1ServiceAccount to be created '
137                              f'from manifest {template}')
138        if resource.metadata.name != kwargs['service_account_name']:
139            raise RunnerError('V1ServiceAccount created with unexpected name: '
140                              f'{resource.metadata.name}')
141        logger.debug('V1ServiceAccount %s created at %s',
142                     resource.metadata.self_link,
143                     resource.metadata.creation_timestamp)
144        return resource
145
146    def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment:
147        deployment = self._create_from_template(template, **kwargs)
148        if not isinstance(deployment, k8s.V1Deployment):
149            raise RunnerError('Expected V1Deployment to be created '
150                              f'from manifest {template}')
151        if deployment.metadata.name != kwargs['deployment_name']:
152            raise RunnerError('V1Deployment created with unexpected name: '
153                              f'{deployment.metadata.name}')
154        logger.debug('V1Deployment %s created at %s',
155                     deployment.metadata.self_link,
156                     deployment.metadata.creation_timestamp)
157        return deployment
158
159    def _create_service(self, template, **kwargs) -> k8s.V1Service:
160        service = self._create_from_template(template, **kwargs)
161        if not isinstance(service, k8s.V1Service):
162            raise RunnerError('Expected V1Service to be created '
163                              f'from manifest {template}')
164        if service.metadata.name != kwargs['service_name']:
165            raise RunnerError('V1Service created with unexpected name: '
166                              f'{service.metadata.name}')
167        logger.debug('V1Service %s created at %s', service.metadata.self_link,
168                     service.metadata.creation_timestamp)
169        return service
170
171    def _delete_deployment(self, name, wait_for_deletion=True):
172        logger.info('Deleting deployment %s', name)
173        try:
174            self.k8s_namespace.delete_deployment(name)
175        except k8s.ApiException as e:
176            logger.info('Deployment %s deletion failed, error: %s %s', name,
177                        e.status, e.reason)
178            return
179
180        if wait_for_deletion:
181            self.k8s_namespace.wait_for_deployment_deleted(name)
182        logger.debug('Deployment %s deleted', name)
183
184    def _delete_service(self, name, wait_for_deletion=True):
185        logger.info('Deleting service %s', name)
186        try:
187            self.k8s_namespace.delete_service(name)
188        except k8s.ApiException as e:
189            logger.info('Service %s deletion failed, error: %s %s', name,
190                        e.status, e.reason)
191            return
192
193        if wait_for_deletion:
194            self.k8s_namespace.wait_for_service_deleted(name)
195        logger.debug('Service %s deleted', name)
196
197    def _delete_service_account(self, name, wait_for_deletion=True):
198        logger.info('Deleting service account %s', name)
199        try:
200            self.k8s_namespace.delete_service_account(name)
201        except k8s.ApiException as e:
202            logger.info('Service account %s deletion failed, error: %s %s',
203                        name, e.status, e.reason)
204            return
205
206        if wait_for_deletion:
207            self.k8s_namespace.wait_for_service_account_deleted(name)
208        logger.debug('Service account %s deleted', name)
209
210    def _delete_namespace(self, wait_for_deletion=True):
211        logger.info('Deleting namespace %s', self.k8s_namespace.name)
212        try:
213            self.k8s_namespace.delete()
214        except k8s.ApiException as e:
215            logger.info('Namespace %s deletion failed, error: %s %s',
216                        self.k8s_namespace.name, e.status, e.reason)
217            return
218
219        if wait_for_deletion:
220            self.k8s_namespace.wait_for_namespace_deleted()
221        logger.debug('Namespace %s deleted', self.k8s_namespace.name)
222
223    def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs):
224        logger.info('Waiting for deployment %s to have %s available replica(s)',
225                    name, count)
226        self.k8s_namespace.wait_for_deployment_available_replicas(
227            name, count, **kwargs)
228        deployment = self.k8s_namespace.get_deployment(name)
229        logger.info('Deployment %s has %i replicas available',
230                    deployment.metadata.name,
231                    deployment.status.available_replicas)
232
233    def _wait_pod_started(self, name, **kwargs):
234        logger.info('Waiting for pod %s to start', name)
235        self.k8s_namespace.wait_for_pod_started(name, **kwargs)
236        pod = self.k8s_namespace.get_pod(name)
237        logger.info('Pod %s ready, IP: %s', pod.metadata.name,
238                    pod.status.pod_ip)
239
240    def _wait_service_neg(self, name, service_port, **kwargs):
241        logger.info('Waiting for NEG for service %s', name)
242        self.k8s_namespace.wait_for_service_neg(name, **kwargs)
243        neg_name, neg_zones = self.k8s_namespace.get_service_neg(
244            name, service_port)
245        logger.info("Service %s: detected NEG=%s in zones=%s", name, neg_name,
246                    neg_zones)
247