1#!/usr/bin/env python
2# Copyright 2020 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run xDS integration tests on GCP using Traffic Director."""
16
17import argparse
18import googleapiclient.discovery
19import grpc
20import json
21import logging
22import os
23import random
24import shlex
25import socket
26import subprocess
27import sys
28import tempfile
29import time
30
31from oauth2client.client import GoogleCredentials
32
33import python_utils.jobset as jobset
34import python_utils.report_utils as report_utils
35
36from src.proto.grpc.health.v1 import health_pb2
37from src.proto.grpc.health.v1 import health_pb2_grpc
38from src.proto.grpc.testing import empty_pb2
39from src.proto.grpc.testing import messages_pb2
40from src.proto.grpc.testing import test_pb2_grpc
41
42logger = logging.getLogger()
43console_handler = logging.StreamHandler()
44formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
45console_handler.setFormatter(formatter)
46logger.handlers = []
47logger.addHandler(console_handler)
48logger.setLevel(logging.WARNING)
49
50_TEST_CASES = [
51    'backends_restart',
52    'change_backend_service',
53    'gentle_failover',
54    'ping_pong',
55    'remove_instance_group',
56    'round_robin',
57    'secondary_locality_gets_no_requests_on_partial_primary_failure',
58    'secondary_locality_gets_requests_on_primary_failure',
59    'traffic_splitting',
60]
61# Valid test cases, but not in all. So the tests can only run manually, and
62# aren't enabled automatically for all languages.
63#
64# TODO: Move them into _TEST_CASES when support is ready in all languages.
65_ADDITIONAL_TEST_CASES = [
66    'path_matching',
67    'header_matching',
68    'circuit_breaking',
69]
70
71
72def parse_test_cases(arg):
73    if arg == '':
74        return []
75    arg_split = arg.split(',')
76    test_cases = set()
77    all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
78    for arg in arg_split:
79        if arg == "all":
80            test_cases = test_cases.union(_TEST_CASES)
81        else:
82            test_cases = test_cases.union([arg])
83    if not all([test_case in all_test_cases for test_case in test_cases]):
84        raise Exception('Failed to parse test cases %s' % arg)
85    # Perserve order.
86    return [x for x in all_test_cases if x in test_cases]
87
88
89def parse_port_range(port_arg):
90    try:
91        port = int(port_arg)
92        return range(port, port + 1)
93    except:
94        port_min, port_max = port_arg.split(':')
95        return range(int(port_min), int(port_max) + 1)
96
97
98argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
99argp.add_argument('--project_id', help='GCP project id')
100argp.add_argument(
101    '--gcp_suffix',
102    default='',
103    help='Optional suffix for all generated GCP resource names. Useful to '
104    'ensure distinct names across test runs.')
105argp.add_argument(
106    '--test_case',
107    default='ping_pong',
108    type=parse_test_cases,
109    help='Comma-separated list of test cases to run. Available tests: %s, '
110    '(or \'all\' to run every test). '
111    'Alternative tests not included in \'all\': %s' %
112    (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
113argp.add_argument(
114    '--bootstrap_file',
115    default='',
116    help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
117    'bootstrap generation')
118argp.add_argument(
119    '--xds_v3_support',
120    default=False,
121    action='store_true',
122    help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
123    'If a pre-created bootstrap file is provided via the --bootstrap_file '
124    'parameter, it should include xds_v3 in its server_features field.')
125argp.add_argument(
126    '--client_cmd',
127    default=None,
128    help='Command to launch xDS test client. {server_uri}, {stats_port} and '
129    '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
130    'will be set for the command')
131argp.add_argument(
132    '--client_hosts',
133    default=None,
134    help='Comma-separated list of hosts running client processes. If set, '
135    '--client_cmd is ignored and client processes are assumed to be running on '
136    'the specified hosts.')
137argp.add_argument('--zone', default='us-central1-a')
138argp.add_argument('--secondary_zone',
139                  default='us-west1-b',
140                  help='Zone to use for secondary TD locality tests')
141argp.add_argument('--qps', default=100, type=int, help='Client QPS')
142argp.add_argument(
143    '--wait_for_backend_sec',
144    default=1200,
145    type=int,
146    help='Time limit for waiting for created backend services to report '
147    'healthy when launching or updated GCP resources')
148argp.add_argument(
149    '--use_existing_gcp_resources',
150    default=False,
151    action='store_true',
152    help=
153    'If set, find and use already created GCP resources instead of creating new'
154    ' ones.')
155argp.add_argument(
156    '--keep_gcp_resources',
157    default=False,
158    action='store_true',
159    help=
160    'Leave GCP VMs and configuration running after test. Default behavior is '
161    'to delete when tests complete.')
162argp.add_argument(
163    '--compute_discovery_document',
164    default=None,
165    type=str,
166    help=
167    'If provided, uses this file instead of retrieving via the GCP discovery '
168    'API')
169argp.add_argument(
170    '--alpha_compute_discovery_document',
171    default=None,
172    type=str,
173    help='If provided, uses this file instead of retrieving via the alpha GCP '
174    'discovery API')
175argp.add_argument('--network',
176                  default='global/networks/default',
177                  help='GCP network to use')
178argp.add_argument('--service_port_range',
179                  default='8080:8110',
180                  type=parse_port_range,
181                  help='Listening port for created gRPC backends. Specified as '
182                  'either a single int or as a range in the format min:max, in '
183                  'which case an available port p will be chosen s.t. min <= p '
184                  '<= max')
185argp.add_argument(
186    '--stats_port',
187    default=8079,
188    type=int,
189    help='Local port for the client process to expose the LB stats service')
190argp.add_argument('--xds_server',
191                  default='trafficdirector.googleapis.com:443',
192                  help='xDS server')
193argp.add_argument('--source_image',
194                  default='projects/debian-cloud/global/images/family/debian-9',
195                  help='Source image for VMs created during the test')
196argp.add_argument('--path_to_server_binary',
197                  default=None,
198                  type=str,
199                  help='If set, the server binary must already be pre-built on '
200                  'the specified source image')
201argp.add_argument('--machine_type',
202                  default='e2-standard-2',
203                  help='Machine type for VMs created during the test')
204argp.add_argument(
205    '--instance_group_size',
206    default=2,
207    type=int,
208    help='Number of VMs to create per instance group. Certain test cases (e.g., '
209    'round_robin) may not give meaningful results if this is set to a value '
210    'less than 2.')
211argp.add_argument('--verbose',
212                  help='verbose log output',
213                  default=False,
214                  action='store_true')
215# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
216# visible in all test environments.
217argp.add_argument('--log_client_output',
218                  help='Log captured client output',
219                  default=False,
220                  action='store_true')
221# TODO(ericgribkoff) Remove this flag once all test environments are verified to
222# have access to the alpha compute APIs.
223argp.add_argument('--only_stable_gcp_apis',
224                  help='Do not use alpha compute APIs. Some tests may be '
225                  'incompatible with this option (gRPC health checks are '
226                  'currently alpha and required for simulating server failure',
227                  default=False,
228                  action='store_true')
229args = argp.parse_args()
230
231if args.verbose:
232    logger.setLevel(logging.DEBUG)
233
234CLIENT_HOSTS = []
235if args.client_hosts:
236    CLIENT_HOSTS = args.client_hosts.split(',')
237
238_DEFAULT_SERVICE_PORT = 80
239_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
240_WAIT_FOR_OPERATION_SEC = 1200
241_INSTANCE_GROUP_SIZE = args.instance_group_size
242_NUM_TEST_RPCS = 10 * args.qps
243_WAIT_FOR_STATS_SEC = 360
244_WAIT_FOR_VALID_CONFIG_SEC = 60
245_WAIT_FOR_URL_MAP_PATCH_SEC = 300
246_CONNECTION_TIMEOUT_SEC = 60
247_GCP_API_RETRIES = 5
248_BOOTSTRAP_TEMPLATE = """
249{{
250  "node": {{
251    "id": "{node_id}",
252    "metadata": {{
253      "TRAFFICDIRECTOR_NETWORK_NAME": "%s"
254    }},
255    "locality": {{
256      "zone": "%s"
257    }}
258  }},
259  "xds_servers": [{{
260    "server_uri": "%s",
261    "channel_creds": [
262      {{
263        "type": "google_default",
264        "config": {{}}
265      }}
266    ],
267    "server_features": {server_features}
268  }}]
269}}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
270
271# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
272# sends an update with no localities when adding the MIG to the backend service
273# can race with the URL map patch.
274_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
275# Tests that run UnaryCall and EmptyCall.
276_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
277# Tests that make UnaryCall with test metadata.
278_TESTS_TO_SEND_METADATA = ['header_matching']
279_TEST_METADATA_KEY = 'xds_md'
280_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
281_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
282_PATH_MATCHER_NAME = 'path-matcher'
283_BASE_TEMPLATE_NAME = 'test-template'
284_BASE_INSTANCE_GROUP_NAME = 'test-ig'
285_BASE_HEALTH_CHECK_NAME = 'test-hc'
286_BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
287_BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
288_BASE_URL_MAP_NAME = 'test-map'
289_BASE_SERVICE_HOST = 'grpc-test'
290_BASE_TARGET_PROXY_NAME = 'test-target-proxy'
291_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
292_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
293                                  '../../reports')
294_SPONGE_LOG_NAME = 'sponge_log.log'
295_SPONGE_XML_NAME = 'sponge_log.xml'
296
297
298def get_client_stats(num_rpcs, timeout_sec):
299    if CLIENT_HOSTS:
300        hosts = CLIENT_HOSTS
301    else:
302        hosts = ['localhost']
303    for host in hosts:
304        with grpc.insecure_channel('%s:%d' %
305                                   (host, args.stats_port)) as channel:
306            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
307            request = messages_pb2.LoadBalancerStatsRequest()
308            request.num_rpcs = num_rpcs
309            request.timeout_sec = timeout_sec
310            rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
311            logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
312                         args.stats_port)
313            response = stub.GetClientStats(request,
314                                           wait_for_ready=True,
315                                           timeout=rpc_timeout)
316            logger.debug('Invoked GetClientStats RPC to %s: %s', host, response)
317            return response
318
319
320def get_client_accumulated_stats():
321    if CLIENT_HOSTS:
322        hosts = CLIENT_HOSTS
323    else:
324        hosts = ['localhost']
325    for host in hosts:
326        with grpc.insecure_channel('%s:%d' %
327                                   (host, args.stats_port)) as channel:
328            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
329            request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
330            logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
331                         host, args.stats_port)
332            response = stub.GetClientAccumulatedStats(
333                request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
334            logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
335                         host, response)
336            return response
337
338
339def configure_client(rpc_types, metadata):
340    if CLIENT_HOSTS:
341        hosts = CLIENT_HOSTS
342    else:
343        hosts = ['localhost']
344    for host in hosts:
345        with grpc.insecure_channel('%s:%d' %
346                                   (host, args.stats_port)) as channel:
347            stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
348            request = messages_pb2.ClientConfigureRequest()
349            request.types.extend(rpc_types)
350            for rpc_type, md_key, md_value in metadata:
351                md = request.metadata.add()
352                md.type = rpc_type
353                md.key = md_key
354                md.value = md_value
355            logger.debug(
356                'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
357                host, args.stats_port, request)
358            stub.Configure(request,
359                           wait_for_ready=True,
360                           timeout=_CONNECTION_TIMEOUT_SEC)
361            logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
362                         host)
363
364
365class RpcDistributionError(Exception):
366    pass
367
368
369def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
370                                   allow_failures):
371    start_time = time.time()
372    error_msg = None
373    logger.debug('Waiting for %d sec until backends %s receive load' %
374                 (timeout_sec, backends))
375    while time.time() - start_time <= timeout_sec:
376        error_msg = None
377        stats = get_client_stats(num_rpcs, timeout_sec)
378        rpcs_by_peer = stats.rpcs_by_peer
379        for backend in backends:
380            if backend not in rpcs_by_peer:
381                error_msg = 'Backend %s did not receive load' % backend
382                break
383        if not error_msg and len(rpcs_by_peer) > len(backends):
384            error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
385        if not allow_failures and stats.num_failures > 0:
386            error_msg = '%d RPCs failed' % stats.num_failures
387        if not error_msg:
388            return
389    raise RpcDistributionError(error_msg)
390
391
392def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
393                                                     timeout_sec,
394                                                     num_rpcs=_NUM_TEST_RPCS):
395    _verify_rpcs_to_given_backends(backends,
396                                   timeout_sec,
397                                   num_rpcs,
398                                   allow_failures=True)
399
400
401def wait_until_all_rpcs_go_to_given_backends(backends,
402                                             timeout_sec,
403                                             num_rpcs=_NUM_TEST_RPCS):
404    _verify_rpcs_to_given_backends(backends,
405                                   timeout_sec,
406                                   num_rpcs,
407                                   allow_failures=False)
408
409
410def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
411    '''Block until the test client reaches the state with the given number
412    of RPCs being outstanding stably.
413
414    Args:
415      rpc_type: A string indicating the RPC method to check for. Either
416        'UnaryCall' or 'EmptyCall'.
417      timeout_sec: Maximum number of seconds to wait until the desired state
418        is reached.
419      num_rpcs: Expected number of RPCs to be in-flight.
420      threshold: Number within [0,100], the tolerable percentage by which
421        the actual number of RPCs in-flight can differ from the expected number.
422    '''
423    if threshold < 0 or threshold > 100:
424        raise ValueError('Value error: Threshold should be between 0 to 100')
425    threshold_fraction = threshold / 100.0
426    start_time = time.time()
427    error_msg = None
428    logger.debug(
429        'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
430        (timeout_sec, num_rpcs, rpc_type, threshold))
431    while time.time() - start_time <= timeout_sec:
432        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
433                                          threshold_fraction)
434        if error_msg:
435            logger.debug('Progress: %s', error_msg)
436            time.sleep(2)
437        else:
438            break
439    # Ensure the number of outstanding RPCs is stable.
440    if not error_msg:
441        time.sleep(5)
442        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
443                                          threshold_fraction)
444    if error_msg:
445        raise Exception("Wrong number of %s RPCs in-flight: %s" %
446                        (rpc_type, error_msg))
447
448
449def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
450    error_msg = None
451    stats = get_client_accumulated_stats()
452    rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
453    rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
454    rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
455    rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
456    if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
457        error_msg = ('actual(%d) < expected(%d - %d%%)' %
458                     (rpcs_in_flight, num_rpcs, threshold))
459    elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
460        error_msg = ('actual(%d) > expected(%d + %d%%)' %
461                     (rpcs_in_flight, num_rpcs, threshold))
462    return error_msg
463
464
465def compare_distributions(actual_distribution, expected_distribution,
466                          threshold):
467    """Compare if two distributions are similar.
468
469    Args:
470      actual_distribution: A list of floats, contains the actual distribution.
471      expected_distribution: A list of floats, contains the expected distribution.
472      threshold: Number within [0,100], the threshold percentage by which the
473        actual distribution can differ from the expected distribution.
474
475    Returns:
476      The similarity between the distributions as a boolean. Returns true if the
477      actual distribution lies within the threshold of the expected
478      distribution, false otherwise.
479
480    Raises:
481      ValueError: if threshold is not with in [0,100].
482      Exception: containing detailed error messages.
483    """
484    if len(expected_distribution) != len(actual_distribution):
485        raise Exception(
486            'Error: expected and actual distributions have different size (%d vs %d)'
487            % (len(expected_distribution), len(actual_distribution)))
488    if threshold < 0 or threshold > 100:
489        raise ValueError('Value error: Threshold should be between 0 to 100')
490    threshold_fraction = threshold / 100.0
491    for expected, actual in zip(expected_distribution, actual_distribution):
492        if actual < (expected * (1 - threshold_fraction)):
493            raise Exception("actual(%f) < expected(%f-%d%%)" %
494                            (actual, expected, threshold))
495        if actual > (expected * (1 + threshold_fraction)):
496            raise Exception("actual(%f) > expected(%f+%d%%)" %
497                            (actual, expected, threshold))
498    return True
499
500
501def compare_expected_instances(stats, expected_instances):
502    """Compare if stats have expected instances for each type of RPC.
503
504    Args:
505      stats: LoadBalancerStatsResponse reported by interop client.
506      expected_instances: a dict with key as the RPC type (string), value as
507        the expected backend instances (list of strings).
508
509    Returns:
510      Returns true if the instances are expected. False if not.
511    """
512    for rpc_type, expected_peers in expected_instances.items():
513        rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
514        rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
515        logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
516        peers = list(rpcs_by_peer.keys())
517        if set(peers) != set(expected_peers):
518            logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
519                        peers, expected_peers)
520            return False
521    return True
522
523
524def test_backends_restart(gcp, backend_service, instance_group):
525    logger.info('Running test_backends_restart')
526    instance_names = get_instance_names(gcp, instance_group)
527    num_instances = len(instance_names)
528    start_time = time.time()
529    wait_until_all_rpcs_go_to_given_backends(instance_names,
530                                             _WAIT_FOR_STATS_SEC)
531    try:
532        resize_instance_group(gcp, instance_group, 0)
533        wait_until_all_rpcs_go_to_given_backends_or_fail([],
534                                                         _WAIT_FOR_BACKEND_SEC)
535    finally:
536        resize_instance_group(gcp, instance_group, num_instances)
537    wait_for_healthy_backends(gcp, backend_service, instance_group)
538    new_instance_names = get_instance_names(gcp, instance_group)
539    wait_until_all_rpcs_go_to_given_backends(new_instance_names,
540                                             _WAIT_FOR_BACKEND_SEC)
541
542
543def test_change_backend_service(gcp, original_backend_service, instance_group,
544                                alternate_backend_service,
545                                same_zone_instance_group):
546    logger.info('Running test_change_backend_service')
547    original_backend_instances = get_instance_names(gcp, instance_group)
548    alternate_backend_instances = get_instance_names(gcp,
549                                                     same_zone_instance_group)
550    patch_backend_service(gcp, alternate_backend_service,
551                          [same_zone_instance_group])
552    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
553    wait_for_healthy_backends(gcp, alternate_backend_service,
554                              same_zone_instance_group)
555    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
556                                             _WAIT_FOR_STATS_SEC)
557    try:
558        patch_url_map_backend_service(gcp, alternate_backend_service)
559        wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
560                                                 _WAIT_FOR_URL_MAP_PATCH_SEC)
561    finally:
562        patch_url_map_backend_service(gcp, original_backend_service)
563        patch_backend_service(gcp, alternate_backend_service, [])
564
565
566def test_gentle_failover(gcp,
567                         backend_service,
568                         primary_instance_group,
569                         secondary_instance_group,
570                         swapped_primary_and_secondary=False):
571    logger.info('Running test_gentle_failover')
572    num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
573    min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
574    try:
575        if num_primary_instances < min_instances_for_gentle_failover:
576            resize_instance_group(gcp, primary_instance_group,
577                                  min_instances_for_gentle_failover)
578        patch_backend_service(
579            gcp, backend_service,
580            [primary_instance_group, secondary_instance_group])
581        primary_instance_names = get_instance_names(gcp, primary_instance_group)
582        secondary_instance_names = get_instance_names(gcp,
583                                                      secondary_instance_group)
584        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
585        wait_for_healthy_backends(gcp, backend_service,
586                                  secondary_instance_group)
587        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
588                                                 _WAIT_FOR_STATS_SEC)
589        instances_to_stop = primary_instance_names[:-1]
590        remaining_instances = primary_instance_names[-1:]
591        try:
592            set_serving_status(instances_to_stop,
593                               gcp.service_port,
594                               serving=False)
595            wait_until_all_rpcs_go_to_given_backends(
596                remaining_instances + secondary_instance_names,
597                _WAIT_FOR_BACKEND_SEC)
598        finally:
599            set_serving_status(primary_instance_names,
600                               gcp.service_port,
601                               serving=True)
602    except RpcDistributionError as e:
603        if not swapped_primary_and_secondary and is_primary_instance_group(
604                gcp, secondary_instance_group):
605            # Swap expectation of primary and secondary instance groups.
606            test_gentle_failover(gcp,
607                                 backend_service,
608                                 secondary_instance_group,
609                                 primary_instance_group,
610                                 swapped_primary_and_secondary=True)
611        else:
612            raise e
613    finally:
614        patch_backend_service(gcp, backend_service, [primary_instance_group])
615        resize_instance_group(gcp, primary_instance_group,
616                              num_primary_instances)
617        instance_names = get_instance_names(gcp, primary_instance_group)
618        wait_until_all_rpcs_go_to_given_backends(instance_names,
619                                                 _WAIT_FOR_BACKEND_SEC)
620
621
622def test_ping_pong(gcp, backend_service, instance_group):
623    logger.info('Running test_ping_pong')
624    wait_for_healthy_backends(gcp, backend_service, instance_group)
625    instance_names = get_instance_names(gcp, instance_group)
626    wait_until_all_rpcs_go_to_given_backends(instance_names,
627                                             _WAIT_FOR_STATS_SEC)
628
629
630def test_remove_instance_group(gcp, backend_service, instance_group,
631                               same_zone_instance_group):
632    logger.info('Running test_remove_instance_group')
633    try:
634        patch_backend_service(gcp,
635                              backend_service,
636                              [instance_group, same_zone_instance_group],
637                              balancing_mode='RATE')
638        wait_for_healthy_backends(gcp, backend_service, instance_group)
639        wait_for_healthy_backends(gcp, backend_service,
640                                  same_zone_instance_group)
641        instance_names = get_instance_names(gcp, instance_group)
642        same_zone_instance_names = get_instance_names(gcp,
643                                                      same_zone_instance_group)
644        try:
645            wait_until_all_rpcs_go_to_given_backends(
646                instance_names + same_zone_instance_names,
647                _WAIT_FOR_OPERATION_SEC)
648            remaining_instance_group = same_zone_instance_group
649            remaining_instance_names = same_zone_instance_names
650        except RpcDistributionError as e:
651            # If connected to TD in a different zone, we may route traffic to
652            # only one instance group. Determine which group that is to continue
653            # with the remainder of the test case.
654            try:
655                wait_until_all_rpcs_go_to_given_backends(
656                    instance_names, _WAIT_FOR_STATS_SEC)
657                remaining_instance_group = same_zone_instance_group
658                remaining_instance_names = same_zone_instance_names
659            except RpcDistributionError as e:
660                wait_until_all_rpcs_go_to_given_backends(
661                    same_zone_instance_names, _WAIT_FOR_STATS_SEC)
662                remaining_instance_group = instance_group
663                remaining_instance_names = instance_names
664        patch_backend_service(gcp,
665                              backend_service, [remaining_instance_group],
666                              balancing_mode='RATE')
667        wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
668                                                 _WAIT_FOR_BACKEND_SEC)
669    finally:
670        patch_backend_service(gcp, backend_service, [instance_group])
671        wait_until_all_rpcs_go_to_given_backends(instance_names,
672                                                 _WAIT_FOR_BACKEND_SEC)
673
674
675def test_round_robin(gcp, backend_service, instance_group):
676    logger.info('Running test_round_robin')
677    wait_for_healthy_backends(gcp, backend_service, instance_group)
678    instance_names = get_instance_names(gcp, instance_group)
679    threshold = 1
680    wait_until_all_rpcs_go_to_given_backends(instance_names,
681                                             _WAIT_FOR_STATS_SEC)
682    # TODO(ericgribkoff) Delayed config propagation from earlier tests
683    # may result in briefly receiving an empty EDS update, resulting in failed
684    # RPCs. Retry distribution validation if this occurs; long-term fix is
685    # creating new backend resources for each individual test case.
686    # Each attempt takes 10 seconds. Config propagation can take several
687    # minutes.
688    max_attempts = 40
689    for i in range(max_attempts):
690        stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
691        requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
692        total_requests_received = sum(requests_received)
693        if total_requests_received != _NUM_TEST_RPCS:
694            logger.info('Unexpected RPC failures, retrying: %s', stats)
695            continue
696        expected_requests = total_requests_received / len(instance_names)
697        for instance in instance_names:
698            if abs(stats.rpcs_by_peer[instance] -
699                   expected_requests) > threshold:
700                raise Exception(
701                    'RPC peer distribution differs from expected by more than %d '
702                    'for instance %s (%s)' % (threshold, instance, stats))
703        return
704    raise Exception('RPC failures persisted through %d retries' % max_attempts)
705
706
707def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
708        gcp,
709        backend_service,
710        primary_instance_group,
711        secondary_instance_group,
712        swapped_primary_and_secondary=False):
713    logger.info(
714        'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
715    )
716    try:
717        patch_backend_service(
718            gcp, backend_service,
719            [primary_instance_group, secondary_instance_group])
720        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
721        wait_for_healthy_backends(gcp, backend_service,
722                                  secondary_instance_group)
723        primary_instance_names = get_instance_names(gcp, primary_instance_group)
724        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
725                                                 _WAIT_FOR_STATS_SEC)
726        instances_to_stop = primary_instance_names[:1]
727        remaining_instances = primary_instance_names[1:]
728        try:
729            set_serving_status(instances_to_stop,
730                               gcp.service_port,
731                               serving=False)
732            wait_until_all_rpcs_go_to_given_backends(remaining_instances,
733                                                     _WAIT_FOR_BACKEND_SEC)
734        finally:
735            set_serving_status(primary_instance_names,
736                               gcp.service_port,
737                               serving=True)
738    except RpcDistributionError as e:
739        if not swapped_primary_and_secondary and is_primary_instance_group(
740                gcp, secondary_instance_group):
741            # Swap expectation of primary and secondary instance groups.
742            test_secondary_locality_gets_no_requests_on_partial_primary_failure(
743                gcp,
744                backend_service,
745                secondary_instance_group,
746                primary_instance_group,
747                swapped_primary_and_secondary=True)
748        else:
749            raise e
750    finally:
751        patch_backend_service(gcp, backend_service, [primary_instance_group])
752
753
754def test_secondary_locality_gets_requests_on_primary_failure(
755        gcp,
756        backend_service,
757        primary_instance_group,
758        secondary_instance_group,
759        swapped_primary_and_secondary=False):
760    logger.info('Running secondary_locality_gets_requests_on_primary_failure')
761    try:
762        patch_backend_service(
763            gcp, backend_service,
764            [primary_instance_group, secondary_instance_group])
765        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
766        wait_for_healthy_backends(gcp, backend_service,
767                                  secondary_instance_group)
768        primary_instance_names = get_instance_names(gcp, primary_instance_group)
769        secondary_instance_names = get_instance_names(gcp,
770                                                      secondary_instance_group)
771        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
772                                                 _WAIT_FOR_STATS_SEC)
773        try:
774            set_serving_status(primary_instance_names,
775                               gcp.service_port,
776                               serving=False)
777            wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
778                                                     _WAIT_FOR_BACKEND_SEC)
779        finally:
780            set_serving_status(primary_instance_names,
781                               gcp.service_port,
782                               serving=True)
783    except RpcDistributionError as e:
784        if not swapped_primary_and_secondary and is_primary_instance_group(
785                gcp, secondary_instance_group):
786            # Swap expectation of primary and secondary instance groups.
787            test_secondary_locality_gets_requests_on_primary_failure(
788                gcp,
789                backend_service,
790                secondary_instance_group,
791                primary_instance_group,
792                swapped_primary_and_secondary=True)
793        else:
794            raise e
795    finally:
796        patch_backend_service(gcp, backend_service, [primary_instance_group])
797
798
799def prepare_services_for_urlmap_tests(gcp, original_backend_service,
800                                      instance_group, alternate_backend_service,
801                                      same_zone_instance_group):
802    '''
803    This function prepares the services to be ready for tests that modifies
804    urlmaps.
805
806    Returns:
807      Returns original and alternate backend names as lists of strings.
808    '''
809    logger.info('waiting for original backends to become healthy')
810    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
811
812    patch_backend_service(gcp, alternate_backend_service,
813                          [same_zone_instance_group])
814    logger.info('waiting for alternate to become healthy')
815    wait_for_healthy_backends(gcp, alternate_backend_service,
816                              same_zone_instance_group)
817
818    original_backend_instances = get_instance_names(gcp, instance_group)
819    logger.info('original backends instances: %s', original_backend_instances)
820
821    alternate_backend_instances = get_instance_names(gcp,
822                                                     same_zone_instance_group)
823    logger.info('alternate backends instances: %s', alternate_backend_instances)
824
825    # Start with all traffic going to original_backend_service.
826    logger.info('waiting for traffic to all go to original backends')
827    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
828                                             _WAIT_FOR_STATS_SEC)
829    return original_backend_instances, alternate_backend_instances
830
831
832def test_traffic_splitting(gcp, original_backend_service, instance_group,
833                           alternate_backend_service, same_zone_instance_group):
834    # This test start with all traffic going to original_backend_service. Then
835    # it updates URL-map to set default action to traffic splitting between
836    # original and alternate. It waits for all backends in both services to
837    # receive traffic, then verifies that weights are expected.
838    logger.info('Running test_traffic_splitting')
839
840    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
841        gcp, original_backend_service, instance_group,
842        alternate_backend_service, same_zone_instance_group)
843
844    try:
845        # Patch urlmap, change route action to traffic splitting between
846        # original and alternate.
847        logger.info('patching url map with traffic splitting')
848        original_service_percentage, alternate_service_percentage = 20, 80
849        patch_url_map_backend_service(
850            gcp,
851            services_with_weights={
852                original_backend_service: original_service_percentage,
853                alternate_backend_service: alternate_service_percentage,
854            })
855        # Split percentage between instances: [20,80] -> [10,10,40,40].
856        expected_instance_percentage = [
857            original_service_percentage * 1.0 / len(original_backend_instances)
858        ] * len(original_backend_instances) + [
859            alternate_service_percentage * 1.0 /
860            len(alternate_backend_instances)
861        ] * len(alternate_backend_instances)
862
863        # Wait for traffic to go to both services.
864        logger.info(
865            'waiting for traffic to go to all backends (including alternate)')
866        wait_until_all_rpcs_go_to_given_backends(
867            original_backend_instances + alternate_backend_instances,
868            _WAIT_FOR_STATS_SEC)
869
870        # Verify that weights between two services are expected.
871        retry_count = 10
872        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
873        # seconds timeout.
874        for i in range(retry_count):
875            stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
876            got_instance_count = [
877                stats.rpcs_by_peer[i] for i in original_backend_instances
878            ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
879            total_count = sum(got_instance_count)
880            got_instance_percentage = [
881                x * 100.0 / total_count for x in got_instance_count
882            ]
883
884            try:
885                compare_distributions(got_instance_percentage,
886                                      expected_instance_percentage, 5)
887            except Exception as e:
888                logger.info('attempt %d', i)
889                logger.info('got percentage: %s', got_instance_percentage)
890                logger.info('expected percentage: %s',
891                            expected_instance_percentage)
892                logger.info(e)
893                if i == retry_count - 1:
894                    raise Exception(
895                        'RPC distribution (%s) differs from expected (%s)' %
896                        (got_instance_percentage, expected_instance_percentage))
897            else:
898                logger.info("success")
899                break
900    finally:
901        patch_url_map_backend_service(gcp, original_backend_service)
902        patch_backend_service(gcp, alternate_backend_service, [])
903
904
905def test_path_matching(gcp, original_backend_service, instance_group,
906                       alternate_backend_service, same_zone_instance_group):
907    # This test start with all traffic (UnaryCall and EmptyCall) going to
908    # original_backend_service.
909    #
910    # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
911    # go different backends. It waits for all backends in both services to
912    # receive traffic, then verifies that traffic goes to the expected
913    # backends.
914    logger.info('Running test_path_matching')
915
916    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
917        gcp, original_backend_service, instance_group,
918        alternate_backend_service, same_zone_instance_group)
919
920    try:
921        # A list of tuples (route_rules, expected_instances).
922        test_cases = [
923            (
924                [{
925                    'priority': 0,
926                    # FullPath EmptyCall -> alternate_backend_service.
927                    'matchRules': [{
928                        'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
929                    }],
930                    'service': alternate_backend_service.url
931                }],
932                {
933                    "EmptyCall": alternate_backend_instances,
934                    "UnaryCall": original_backend_instances
935                }),
936            (
937                [{
938                    'priority': 0,
939                    # Prefix UnaryCall -> alternate_backend_service.
940                    'matchRules': [{
941                        'prefixMatch': '/grpc.testing.TestService/Unary'
942                    }],
943                    'service': alternate_backend_service.url
944                }],
945                {
946                    "UnaryCall": alternate_backend_instances,
947                    "EmptyCall": original_backend_instances
948                }),
949            (
950                # This test case is similar to the one above (but with route
951                # services swapped). This test has two routes (full_path and
952                # the default) to match EmptyCall, and both routes set
953                # alternative_backend_service as the action. This forces the
954                # client to handle duplicate Clusters in the RDS response.
955                [
956                    {
957                        'priority': 0,
958                        # Prefix UnaryCall -> original_backend_service.
959                        'matchRules': [{
960                            'prefixMatch': '/grpc.testing.TestService/Unary'
961                        }],
962                        'service': original_backend_service.url
963                    },
964                    {
965                        'priority': 1,
966                        # FullPath EmptyCall -> alternate_backend_service.
967                        'matchRules': [{
968                            'fullPathMatch':
969                                '/grpc.testing.TestService/EmptyCall'
970                        }],
971                        'service': alternate_backend_service.url
972                    }
973                ],
974                {
975                    "UnaryCall": original_backend_instances,
976                    "EmptyCall": alternate_backend_instances
977                })
978        ]
979
980        for (route_rules, expected_instances) in test_cases:
981            logger.info('patching url map with %s', route_rules)
982            patch_url_map_backend_service(gcp,
983                                          original_backend_service,
984                                          route_rules=route_rules)
985
986            # Wait for traffic to go to both services.
987            logger.info(
988                'waiting for traffic to go to all backends (including alternate)'
989            )
990            wait_until_all_rpcs_go_to_given_backends(
991                original_backend_instances + alternate_backend_instances,
992                _WAIT_FOR_STATS_SEC)
993
994            retry_count = 20
995            # Each attempt takes about 10 seconds, 20 retries is equivalent to 200
996            # seconds timeout.
997            for i in range(retry_count):
998                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
999                if not stats.rpcs_by_method:
1000                    raise ValueError(
1001                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1002                    )
1003                logger.info('attempt %d', i)
1004                if compare_expected_instances(stats, expected_instances):
1005                    logger.info("success")
1006                    break
1007    finally:
1008        patch_url_map_backend_service(gcp, original_backend_service)
1009        patch_backend_service(gcp, alternate_backend_service, [])
1010
1011
1012def test_header_matching(gcp, original_backend_service, instance_group,
1013                         alternate_backend_service, same_zone_instance_group):
1014    # This test start with all traffic (UnaryCall and EmptyCall) going to
1015    # original_backend_service.
1016    #
1017    # Then it updates URL-map to add routes, to make RPCs with test headers to
1018    # go to different backends. It waits for all backends in both services to
1019    # receive traffic, then verifies that traffic goes to the expected
1020    # backends.
1021    logger.info('Running test_header_matching')
1022
1023    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1024        gcp, original_backend_service, instance_group,
1025        alternate_backend_service, same_zone_instance_group)
1026
1027    try:
1028        # A list of tuples (route_rules, expected_instances).
1029        test_cases = [
1030            (
1031                [{
1032                    'priority': 0,
1033                    # Header ExactMatch -> alternate_backend_service.
1034                    # EmptyCall is sent with the metadata.
1035                    'matchRules': [{
1036                        'prefixMatch':
1037                            '/',
1038                        'headerMatches': [{
1039                            'headerName': _TEST_METADATA_KEY,
1040                            'exactMatch': _TEST_METADATA_VALUE_EMPTY
1041                        }]
1042                    }],
1043                    'service': alternate_backend_service.url
1044                }],
1045                {
1046                    "EmptyCall": alternate_backend_instances,
1047                    "UnaryCall": original_backend_instances
1048                }),
1049            (
1050                [{
1051                    'priority': 0,
1052                    # Header PrefixMatch -> alternate_backend_service.
1053                    # UnaryCall is sent with the metadata.
1054                    'matchRules': [{
1055                        'prefixMatch':
1056                            '/',
1057                        'headerMatches': [{
1058                            'headerName': _TEST_METADATA_KEY,
1059                            'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
1060                        }]
1061                    }],
1062                    'service': alternate_backend_service.url
1063                }],
1064                {
1065                    "EmptyCall": original_backend_instances,
1066                    "UnaryCall": alternate_backend_instances
1067                }),
1068            (
1069                [{
1070                    'priority': 0,
1071                    # Header SuffixMatch -> alternate_backend_service.
1072                    # EmptyCall is sent with the metadata.
1073                    'matchRules': [{
1074                        'prefixMatch':
1075                            '/',
1076                        'headerMatches': [{
1077                            'headerName': _TEST_METADATA_KEY,
1078                            'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
1079                        }]
1080                    }],
1081                    'service': alternate_backend_service.url
1082                }],
1083                {
1084                    "EmptyCall": alternate_backend_instances,
1085                    "UnaryCall": original_backend_instances
1086                }),
1087            (
1088                [{
1089                    'priority': 0,
1090                    # Header invert ExactMatch -> alternate_backend_service.
1091                    # EmptyCall is sent with the metadata, so will be sent to original.
1092                    'matchRules': [{
1093                        'prefixMatch':
1094                            '/',
1095                        'headerMatches': [{
1096                            'headerName': _TEST_METADATA_KEY,
1097                            'exactMatch': _TEST_METADATA_VALUE_EMPTY,
1098                            'invertMatch': True
1099                        }]
1100                    }],
1101                    'service': alternate_backend_service.url
1102                }],
1103                {
1104                    "EmptyCall": original_backend_instances,
1105                    "UnaryCall": alternate_backend_instances
1106                }),
1107        ]
1108
1109        for (route_rules, expected_instances) in test_cases:
1110            logger.info('patching url map with %s -> alternative',
1111                        route_rules[0]['matchRules'])
1112            patch_url_map_backend_service(gcp,
1113                                          original_backend_service,
1114                                          route_rules=route_rules)
1115
1116            # Wait for traffic to go to both services.
1117            logger.info(
1118                'waiting for traffic to go to all backends (including alternate)'
1119            )
1120            wait_until_all_rpcs_go_to_given_backends(
1121                original_backend_instances + alternate_backend_instances,
1122                _WAIT_FOR_STATS_SEC)
1123
1124            retry_count = 20
1125            # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
1126            # seconds timeout.
1127            for i in range(retry_count):
1128                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1129                if not stats.rpcs_by_method:
1130                    raise ValueError(
1131                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1132                    )
1133                logger.info('attempt %d', i)
1134                if compare_expected_instances(stats, expected_instances):
1135                    logger.info("success")
1136                    break
1137    finally:
1138        patch_url_map_backend_service(gcp, original_backend_service)
1139        patch_backend_service(gcp, alternate_backend_service, [])
1140
1141
1142def test_circuit_breaking(gcp, original_backend_service, instance_group,
1143                          same_zone_instance_group):
1144    '''
1145    Since backend service circuit_breakers configuration cannot be unset,
1146    which causes trouble for restoring validate_for_proxy flag in target
1147    proxy/global forwarding rule. This test uses dedicated backend sevices.
1148    The url_map and backend services undergoes the following state changes:
1149
1150    Before test:
1151       original_backend_service -> [instance_group]
1152       extra_backend_service -> []
1153       more_extra_backend_service -> []
1154
1155       url_map -> [original_backend_service]
1156
1157    In test:
1158       extra_backend_service (with circuit_breakers) -> [instance_group]
1159       more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
1160
1161       url_map -> [extra_backend_service, more_extra_backend_service]
1162
1163    After test:
1164       original_backend_service -> [instance_group]
1165       extra_backend_service (with circuit_breakers) -> []
1166       more_extra_backend_service (with circuit_breakers) -> []
1167
1168       url_map -> [original_backend_service]
1169    '''
1170    logger.info('Running test_circuit_breaking')
1171    additional_backend_services = []
1172    try:
1173        # TODO(chengyuanzhang): Dedicated backend services created for circuit
1174        # breaking test. Once the issue for unsetting backend service circuit
1175        # breakers is resolved or configuring backend service circuit breakers is
1176        # enabled for config validation, these dedicated backend services can be
1177        # eliminated.
1178        extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
1179        more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
1180        extra_backend_service = add_backend_service(gcp,
1181                                                    extra_backend_service_name)
1182        additional_backend_services.append(extra_backend_service)
1183        more_extra_backend_service = add_backend_service(
1184            gcp, more_extra_backend_service_name)
1185        additional_backend_services.append(more_extra_backend_service)
1186        # The config validation for proxyless doesn't allow setting
1187        # circuit_breakers. Disable validate validate_for_proxyless
1188        # for this test. This can be removed when validation
1189        # accepts circuit_breakers.
1190        logger.info('disabling validate_for_proxyless in target proxy')
1191        set_validate_for_proxyless(gcp, False)
1192        extra_backend_service_max_requests = 500
1193        more_extra_backend_service_max_requests = 1000
1194        patch_backend_service(gcp,
1195                              extra_backend_service, [instance_group],
1196                              circuit_breakers={
1197                                  'maxRequests':
1198                                      extra_backend_service_max_requests
1199                              })
1200        logger.info('Waiting for extra backends to become healthy')
1201        wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
1202        patch_backend_service(gcp,
1203                              more_extra_backend_service,
1204                              [same_zone_instance_group],
1205                              circuit_breakers={
1206                                  'maxRequests':
1207                                      more_extra_backend_service_max_requests
1208                              })
1209        logger.info('Waiting for more extra backend to become healthy')
1210        wait_for_healthy_backends(gcp, more_extra_backend_service,
1211                                  same_zone_instance_group)
1212        extra_backend_instances = get_instance_names(gcp, instance_group)
1213        more_extra_backend_instances = get_instance_names(
1214            gcp, same_zone_instance_group)
1215        route_rules = [
1216            {
1217                'priority': 0,
1218                # UnaryCall -> extra_backend_service
1219                'matchRules': [{
1220                    'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1221                }],
1222                'service': extra_backend_service.url
1223            },
1224            {
1225                'priority': 1,
1226                # EmptyCall -> more_extra_backend_service
1227                'matchRules': [{
1228                    'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1229                }],
1230                'service': more_extra_backend_service.url
1231            },
1232        ]
1233
1234        # Make client send UNARY_CALL and EMPTY_CALL.
1235        configure_client([
1236            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1237            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1238        ], [])
1239        logger.info('Patching url map with %s', route_rules)
1240        patch_url_map_backend_service(gcp,
1241                                      extra_backend_service,
1242                                      route_rules=route_rules)
1243        logger.info('Waiting for traffic to go to all backends')
1244        wait_until_all_rpcs_go_to_given_backends(
1245            extra_backend_instances + more_extra_backend_instances,
1246            _WAIT_FOR_STATS_SEC)
1247
1248        # Make all calls keep-open.
1249        configure_client([
1250            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1251            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1252        ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1253             'rpc-behavior', 'keep-open'),
1254            (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1255             'rpc-behavior', 'keep-open')])
1256        wait_until_rpcs_in_flight(
1257            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1258                           int(extra_backend_service_max_requests / args.qps)),
1259            extra_backend_service_max_requests, 1)
1260        logger.info('UNARY_CALL reached stable state (%d)',
1261                    extra_backend_service_max_requests)
1262        wait_until_rpcs_in_flight(
1263            'EMPTY_CALL',
1264            (_WAIT_FOR_BACKEND_SEC +
1265             int(more_extra_backend_service_max_requests / args.qps)),
1266            more_extra_backend_service_max_requests, 1)
1267        logger.info('EMPTY_CALL reached stable state (%d)',
1268                    more_extra_backend_service_max_requests)
1269
1270        # Increment circuit breakers max_requests threshold.
1271        extra_backend_service_max_requests = 800
1272        patch_backend_service(gcp,
1273                              extra_backend_service, [instance_group],
1274                              circuit_breakers={
1275                                  'maxRequests':
1276                                      extra_backend_service_max_requests
1277                              })
1278        wait_until_rpcs_in_flight(
1279            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1280                           int(extra_backend_service_max_requests / args.qps)),
1281            extra_backend_service_max_requests, 1)
1282        logger.info('UNARY_CALL reached stable state after increase (%d)',
1283                    extra_backend_service_max_requests)
1284        logger.info('success')
1285        # Avoid new RPCs being outstanding (some test clients create threads
1286        # for sending RPCs) after restoring backend services.
1287        configure_client(
1288            [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [])
1289    finally:
1290        patch_url_map_backend_service(gcp, original_backend_service)
1291        patch_backend_service(gcp, original_backend_service, [instance_group])
1292        for backend_service in additional_backend_services:
1293            delete_backend_service(gcp, backend_service)
1294        set_validate_for_proxyless(gcp, True)
1295
1296
1297def set_validate_for_proxyless(gcp, validate_for_proxyless):
1298    if not gcp.alpha_compute:
1299        logger.debug(
1300            'Not setting validateForProxy because alpha is not enabled')
1301        return
1302    # This function deletes global_forwarding_rule and target_proxy, then
1303    # recreate target_proxy with validateForProxyless=False. This is necessary
1304    # because patching target_grpc_proxy isn't supported.
1305    delete_global_forwarding_rule(gcp)
1306    delete_target_proxy(gcp)
1307    create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless)
1308    create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name,
1309                                  [gcp.service_port])
1310
1311
1312def get_serving_status(instance, service_port):
1313    with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
1314        health_stub = health_pb2_grpc.HealthStub(channel)
1315        return health_stub.Check(health_pb2.HealthCheckRequest())
1316
1317
1318def set_serving_status(instances, service_port, serving):
1319    logger.info('setting %s serving status to %s', instances, serving)
1320    for instance in instances:
1321        with grpc.insecure_channel('%s:%d' %
1322                                   (instance, service_port)) as channel:
1323            logger.info('setting %s serving status to %s', instance, serving)
1324            stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
1325            retry_count = 5
1326            for i in range(5):
1327                if serving:
1328                    stub.SetServing(empty_pb2.Empty())
1329                else:
1330                    stub.SetNotServing(empty_pb2.Empty())
1331                serving_status = get_serving_status(instance, service_port)
1332                logger.info('got instance service status %s', serving_status)
1333                want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
1334                if serving_status.status == want_status:
1335                    break
1336                if i == retry_count - 1:
1337                    raise Exception(
1338                        'failed to set instance service status after %d retries'
1339                        % retry_count)
1340
1341
1342def is_primary_instance_group(gcp, instance_group):
1343    # Clients may connect to a TD instance in a different region than the
1344    # client, in which case primary/secondary assignments may not be based on
1345    # the client's actual locality.
1346    instance_names = get_instance_names(gcp, instance_group)
1347    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1348    return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
1349
1350
1351def get_startup_script(path_to_server_binary, service_port):
1352    if path_to_server_binary:
1353        return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
1354                                                     service_port)
1355    else:
1356        return """#!/bin/bash
1357sudo apt update
1358sudo apt install -y git default-jdk
1359mkdir java_server
1360pushd java_server
1361git clone https://github.com/grpc/grpc-java.git
1362pushd grpc-java
1363pushd interop-testing
1364../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
1365
1366nohup build/install/grpc-interop-testing/bin/xds-test-server \
1367    --port=%d 1>/dev/null &""" % service_port
1368
1369
1370def create_instance_template(gcp, name, network, source_image, machine_type,
1371                             startup_script):
1372    config = {
1373        'name': name,
1374        'properties': {
1375            'tags': {
1376                'items': ['allow-health-checks']
1377            },
1378            'machineType': machine_type,
1379            'serviceAccounts': [{
1380                'email': 'default',
1381                'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
1382            }],
1383            'networkInterfaces': [{
1384                'accessConfigs': [{
1385                    'type': 'ONE_TO_ONE_NAT'
1386                }],
1387                'network': network
1388            }],
1389            'disks': [{
1390                'boot': True,
1391                'initializeParams': {
1392                    'sourceImage': source_image
1393                }
1394            }],
1395            'metadata': {
1396                'items': [{
1397                    'key': 'startup-script',
1398                    'value': startup_script
1399                }]
1400            }
1401        }
1402    }
1403
1404    logger.debug('Sending GCP request with body=%s', config)
1405    result = gcp.compute.instanceTemplates().insert(
1406        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1407    wait_for_global_operation(gcp, result['name'])
1408    gcp.instance_template = GcpResource(config['name'], result['targetLink'])
1409
1410
1411def add_instance_group(gcp, zone, name, size):
1412    config = {
1413        'name': name,
1414        'instanceTemplate': gcp.instance_template.url,
1415        'targetSize': size,
1416        'namedPorts': [{
1417            'name': 'grpc',
1418            'port': gcp.service_port
1419        }]
1420    }
1421
1422    logger.debug('Sending GCP request with body=%s', config)
1423    result = gcp.compute.instanceGroupManagers().insert(
1424        project=gcp.project, zone=zone,
1425        body=config).execute(num_retries=_GCP_API_RETRIES)
1426    wait_for_zone_operation(gcp, zone, result['name'])
1427    result = gcp.compute.instanceGroupManagers().get(
1428        project=gcp.project, zone=zone,
1429        instanceGroupManager=config['name']).execute(
1430            num_retries=_GCP_API_RETRIES)
1431    instance_group = InstanceGroup(config['name'], result['instanceGroup'],
1432                                   zone)
1433    gcp.instance_groups.append(instance_group)
1434    wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
1435                                                   _WAIT_FOR_OPERATION_SEC)
1436    return instance_group
1437
1438
1439def create_health_check(gcp, name):
1440    if gcp.alpha_compute:
1441        config = {
1442            'name': name,
1443            'type': 'GRPC',
1444            'grpcHealthCheck': {
1445                'portSpecification': 'USE_SERVING_PORT'
1446            }
1447        }
1448        compute_to_use = gcp.alpha_compute
1449    else:
1450        config = {
1451            'name': name,
1452            'type': 'TCP',
1453            'tcpHealthCheck': {
1454                'portName': 'grpc'
1455            }
1456        }
1457        compute_to_use = gcp.compute
1458    logger.debug('Sending GCP request with body=%s', config)
1459    result = compute_to_use.healthChecks().insert(
1460        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1461    wait_for_global_operation(gcp, result['name'])
1462    gcp.health_check = GcpResource(config['name'], result['targetLink'])
1463
1464
1465def create_health_check_firewall_rule(gcp, name):
1466    config = {
1467        'name': name,
1468        'direction': 'INGRESS',
1469        'allowed': [{
1470            'IPProtocol': 'tcp'
1471        }],
1472        'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
1473        'targetTags': ['allow-health-checks'],
1474    }
1475    logger.debug('Sending GCP request with body=%s', config)
1476    result = gcp.compute.firewalls().insert(
1477        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1478    wait_for_global_operation(gcp, result['name'])
1479    gcp.health_check_firewall_rule = GcpResource(config['name'],
1480                                                 result['targetLink'])
1481
1482
1483def add_backend_service(gcp, name):
1484    if gcp.alpha_compute:
1485        protocol = 'GRPC'
1486        compute_to_use = gcp.alpha_compute
1487    else:
1488        protocol = 'HTTP2'
1489        compute_to_use = gcp.compute
1490    config = {
1491        'name': name,
1492        'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
1493        'healthChecks': [gcp.health_check.url],
1494        'portName': 'grpc',
1495        'protocol': protocol
1496    }
1497    logger.debug('Sending GCP request with body=%s', config)
1498    result = compute_to_use.backendServices().insert(
1499        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1500    wait_for_global_operation(gcp, result['name'])
1501    backend_service = GcpResource(config['name'], result['targetLink'])
1502    gcp.backend_services.append(backend_service)
1503    return backend_service
1504
1505
1506def create_url_map(gcp, name, backend_service, host_name):
1507    config = {
1508        'name': name,
1509        'defaultService': backend_service.url,
1510        'pathMatchers': [{
1511            'name': _PATH_MATCHER_NAME,
1512            'defaultService': backend_service.url,
1513        }],
1514        'hostRules': [{
1515            'hosts': [host_name],
1516            'pathMatcher': _PATH_MATCHER_NAME
1517        }]
1518    }
1519    logger.debug('Sending GCP request with body=%s', config)
1520    result = gcp.compute.urlMaps().insert(
1521        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1522    wait_for_global_operation(gcp, result['name'])
1523    gcp.url_map = GcpResource(config['name'], result['targetLink'])
1524
1525
1526def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
1527    config = {
1528        'hostRules': [{
1529            'hosts': ['%s:%d' % (host_name, gcp.service_port)],
1530            'pathMatcher': _PATH_MATCHER_NAME
1531        }]
1532    }
1533    logger.debug('Sending GCP request with body=%s', config)
1534    result = gcp.compute.urlMaps().patch(
1535        project=gcp.project, urlMap=name,
1536        body=config).execute(num_retries=_GCP_API_RETRIES)
1537    wait_for_global_operation(gcp, result['name'])
1538
1539
1540def create_target_proxy(gcp, name, validate_for_proxyless=True):
1541    if gcp.alpha_compute:
1542        config = {
1543            'name': name,
1544            'url_map': gcp.url_map.url,
1545            'validate_for_proxyless': validate_for_proxyless
1546        }
1547        logger.debug('Sending GCP request with body=%s', config)
1548        result = gcp.alpha_compute.targetGrpcProxies().insert(
1549            project=gcp.project,
1550            body=config).execute(num_retries=_GCP_API_RETRIES)
1551    else:
1552        config = {
1553            'name': name,
1554            'url_map': gcp.url_map.url,
1555        }
1556        logger.debug('Sending GCP request with body=%s', config)
1557        result = gcp.compute.targetHttpProxies().insert(
1558            project=gcp.project,
1559            body=config).execute(num_retries=_GCP_API_RETRIES)
1560    wait_for_global_operation(gcp, result['name'])
1561    gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
1562
1563
1564def create_global_forwarding_rule(gcp, name, potential_ports):
1565    if gcp.alpha_compute:
1566        compute_to_use = gcp.alpha_compute
1567    else:
1568        compute_to_use = gcp.compute
1569    for port in potential_ports:
1570        try:
1571            config = {
1572                'name': name,
1573                'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
1574                'portRange': str(port),
1575                'IPAddress': '0.0.0.0',
1576                'network': args.network,
1577                'target': gcp.target_proxy.url,
1578            }
1579            logger.debug('Sending GCP request with body=%s', config)
1580            result = compute_to_use.globalForwardingRules().insert(
1581                project=gcp.project,
1582                body=config).execute(num_retries=_GCP_API_RETRIES)
1583            wait_for_global_operation(gcp, result['name'])
1584            gcp.global_forwarding_rule = GcpResource(config['name'],
1585                                                     result['targetLink'])
1586            gcp.service_port = port
1587            return
1588        except googleapiclient.errors.HttpError as http_error:
1589            logger.warning(
1590                'Got error %s when attempting to create forwarding rule to '
1591                '0.0.0.0:%d. Retrying with another port.' % (http_error, port))
1592
1593
1594def get_health_check(gcp, health_check_name):
1595    result = gcp.compute.healthChecks().get(
1596        project=gcp.project, healthCheck=health_check_name).execute()
1597    gcp.health_check = GcpResource(health_check_name, result['selfLink'])
1598
1599
1600def get_health_check_firewall_rule(gcp, firewall_name):
1601    result = gcp.compute.firewalls().get(project=gcp.project,
1602                                         firewall=firewall_name).execute()
1603    gcp.health_check_firewall_rule = GcpResource(firewall_name,
1604                                                 result['selfLink'])
1605
1606
1607def get_backend_service(gcp, backend_service_name):
1608    result = gcp.compute.backendServices().get(
1609        project=gcp.project, backendService=backend_service_name).execute()
1610    backend_service = GcpResource(backend_service_name, result['selfLink'])
1611    gcp.backend_services.append(backend_service)
1612    return backend_service
1613
1614
1615def get_url_map(gcp, url_map_name):
1616    result = gcp.compute.urlMaps().get(project=gcp.project,
1617                                       urlMap=url_map_name).execute()
1618    gcp.url_map = GcpResource(url_map_name, result['selfLink'])
1619
1620
1621def get_target_proxy(gcp, target_proxy_name):
1622    if gcp.alpha_compute:
1623        result = gcp.alpha_compute.targetGrpcProxies().get(
1624            project=gcp.project, targetGrpcProxy=target_proxy_name).execute()
1625    else:
1626        result = gcp.compute.targetHttpProxies().get(
1627            project=gcp.project, targetHttpProxy=target_proxy_name).execute()
1628    gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink'])
1629
1630
1631def get_global_forwarding_rule(gcp, forwarding_rule_name):
1632    result = gcp.compute.globalForwardingRules().get(
1633        project=gcp.project, forwardingRule=forwarding_rule_name).execute()
1634    gcp.global_forwarding_rule = GcpResource(forwarding_rule_name,
1635                                             result['selfLink'])
1636
1637
1638def get_instance_template(gcp, template_name):
1639    result = gcp.compute.instanceTemplates().get(
1640        project=gcp.project, instanceTemplate=template_name).execute()
1641    gcp.instance_template = GcpResource(template_name, result['selfLink'])
1642
1643
1644def get_instance_group(gcp, zone, instance_group_name):
1645    result = gcp.compute.instanceGroups().get(
1646        project=gcp.project, zone=zone,
1647        instanceGroup=instance_group_name).execute()
1648    gcp.service_port = result['namedPorts'][0]['port']
1649    instance_group = InstanceGroup(instance_group_name, result['selfLink'],
1650                                   zone)
1651    gcp.instance_groups.append(instance_group)
1652    return instance_group
1653
1654
1655def delete_global_forwarding_rule(gcp):
1656    try:
1657        result = gcp.compute.globalForwardingRules().delete(
1658            project=gcp.project,
1659            forwardingRule=gcp.global_forwarding_rule.name).execute(
1660                num_retries=_GCP_API_RETRIES)
1661        wait_for_global_operation(gcp, result['name'])
1662    except googleapiclient.errors.HttpError as http_error:
1663        logger.info('Delete failed: %s', http_error)
1664
1665
1666def delete_target_proxy(gcp):
1667    try:
1668        if gcp.alpha_compute:
1669            result = gcp.alpha_compute.targetGrpcProxies().delete(
1670                project=gcp.project,
1671                targetGrpcProxy=gcp.target_proxy.name).execute(
1672                    num_retries=_GCP_API_RETRIES)
1673        else:
1674            result = gcp.compute.targetHttpProxies().delete(
1675                project=gcp.project,
1676                targetHttpProxy=gcp.target_proxy.name).execute(
1677                    num_retries=_GCP_API_RETRIES)
1678        wait_for_global_operation(gcp, result['name'])
1679    except googleapiclient.errors.HttpError as http_error:
1680        logger.info('Delete failed: %s', http_error)
1681
1682
1683def delete_url_map(gcp):
1684    try:
1685        result = gcp.compute.urlMaps().delete(
1686            project=gcp.project,
1687            urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
1688        wait_for_global_operation(gcp, result['name'])
1689    except googleapiclient.errors.HttpError as http_error:
1690        logger.info('Delete failed: %s', http_error)
1691
1692
1693def delete_backend_service(gcp, backend_service):
1694    try:
1695        result = gcp.compute.backendServices().delete(
1696            project=gcp.project, backendService=backend_service.name).execute(
1697                num_retries=_GCP_API_RETRIES)
1698        wait_for_global_operation(gcp, result['name'])
1699    except googleapiclient.errors.HttpError as http_error:
1700        logger.info('Delete failed: %s', http_error)
1701
1702
1703def delete_backend_services(gcp):
1704    for backend_service in gcp.backend_services:
1705        delete_backend_service(gcp, backend_service)
1706
1707
1708def delete_firewall(gcp):
1709    try:
1710        result = gcp.compute.firewalls().delete(
1711            project=gcp.project,
1712            firewall=gcp.health_check_firewall_rule.name).execute(
1713                num_retries=_GCP_API_RETRIES)
1714        wait_for_global_operation(gcp, result['name'])
1715    except googleapiclient.errors.HttpError as http_error:
1716        logger.info('Delete failed: %s', http_error)
1717
1718
1719def delete_health_check(gcp):
1720    try:
1721        result = gcp.compute.healthChecks().delete(
1722            project=gcp.project, healthCheck=gcp.health_check.name).execute(
1723                num_retries=_GCP_API_RETRIES)
1724        wait_for_global_operation(gcp, result['name'])
1725    except googleapiclient.errors.HttpError as http_error:
1726        logger.info('Delete failed: %s', http_error)
1727
1728
1729def delete_instance_groups(gcp):
1730    for instance_group in gcp.instance_groups:
1731        try:
1732            result = gcp.compute.instanceGroupManagers().delete(
1733                project=gcp.project,
1734                zone=instance_group.zone,
1735                instanceGroupManager=instance_group.name).execute(
1736                    num_retries=_GCP_API_RETRIES)
1737            wait_for_zone_operation(gcp,
1738                                    instance_group.zone,
1739                                    result['name'],
1740                                    timeout_sec=_WAIT_FOR_BACKEND_SEC)
1741        except googleapiclient.errors.HttpError as http_error:
1742            logger.info('Delete failed: %s', http_error)
1743
1744
1745def delete_instance_template(gcp):
1746    try:
1747        result = gcp.compute.instanceTemplates().delete(
1748            project=gcp.project,
1749            instanceTemplate=gcp.instance_template.name).execute(
1750                num_retries=_GCP_API_RETRIES)
1751        wait_for_global_operation(gcp, result['name'])
1752    except googleapiclient.errors.HttpError as http_error:
1753        logger.info('Delete failed: %s', http_error)
1754
1755
1756def patch_backend_service(gcp,
1757                          backend_service,
1758                          instance_groups,
1759                          balancing_mode='UTILIZATION',
1760                          circuit_breakers=None):
1761    if gcp.alpha_compute:
1762        compute_to_use = gcp.alpha_compute
1763    else:
1764        compute_to_use = gcp.compute
1765    config = {
1766        'backends': [{
1767            'group': instance_group.url,
1768            'balancingMode': balancing_mode,
1769            'maxRate': 1 if balancing_mode == 'RATE' else None
1770        } for instance_group in instance_groups],
1771        'circuitBreakers': circuit_breakers,
1772    }
1773    logger.debug('Sending GCP request with body=%s', config)
1774    result = compute_to_use.backendServices().patch(
1775        project=gcp.project, backendService=backend_service.name,
1776        body=config).execute(num_retries=_GCP_API_RETRIES)
1777    wait_for_global_operation(gcp,
1778                              result['name'],
1779                              timeout_sec=_WAIT_FOR_BACKEND_SEC)
1780
1781
1782def resize_instance_group(gcp,
1783                          instance_group,
1784                          new_size,
1785                          timeout_sec=_WAIT_FOR_OPERATION_SEC):
1786    result = gcp.compute.instanceGroupManagers().resize(
1787        project=gcp.project,
1788        zone=instance_group.zone,
1789        instanceGroupManager=instance_group.name,
1790        size=new_size).execute(num_retries=_GCP_API_RETRIES)
1791    wait_for_zone_operation(gcp,
1792                            instance_group.zone,
1793                            result['name'],
1794                            timeout_sec=360)
1795    wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
1796                                                   new_size, timeout_sec)
1797
1798
1799def patch_url_map_backend_service(gcp,
1800                                  backend_service=None,
1801                                  services_with_weights=None,
1802                                  route_rules=None):
1803    '''change url_map's backend service
1804
1805    Only one of backend_service and service_with_weights can be not None.
1806    '''
1807    if backend_service and services_with_weights:
1808        raise ValueError(
1809            'both backend_service and service_with_weights are not None.')
1810
1811    default_service = backend_service.url if backend_service else None
1812    default_route_action = {
1813        'weightedBackendServices': [{
1814            'backendService': service.url,
1815            'weight': w,
1816        } for service, w in services_with_weights.items()]
1817    } if services_with_weights else None
1818
1819    config = {
1820        'pathMatchers': [{
1821            'name': _PATH_MATCHER_NAME,
1822            'defaultService': default_service,
1823            'defaultRouteAction': default_route_action,
1824            'routeRules': route_rules,
1825        }]
1826    }
1827    logger.debug('Sending GCP request with body=%s', config)
1828    result = gcp.compute.urlMaps().patch(
1829        project=gcp.project, urlMap=gcp.url_map.name,
1830        body=config).execute(num_retries=_GCP_API_RETRIES)
1831    wait_for_global_operation(gcp, result['name'])
1832
1833
1834def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
1835                                                   expected_size, timeout_sec):
1836    start_time = time.time()
1837    while True:
1838        current_size = len(get_instance_names(gcp, instance_group))
1839        if current_size == expected_size:
1840            break
1841        if time.time() - start_time > timeout_sec:
1842            raise Exception(
1843                'Instance group had expected size %d but actual size %d' %
1844                (expected_size, current_size))
1845        time.sleep(2)
1846
1847
1848def wait_for_global_operation(gcp,
1849                              operation,
1850                              timeout_sec=_WAIT_FOR_OPERATION_SEC):
1851    start_time = time.time()
1852    while time.time() - start_time <= timeout_sec:
1853        result = gcp.compute.globalOperations().get(
1854            project=gcp.project,
1855            operation=operation).execute(num_retries=_GCP_API_RETRIES)
1856        if result['status'] == 'DONE':
1857            if 'error' in result:
1858                raise Exception(result['error'])
1859            return
1860        time.sleep(2)
1861    raise Exception('Operation %s did not complete within %d' %
1862                    (operation, timeout_sec))
1863
1864
1865def wait_for_zone_operation(gcp,
1866                            zone,
1867                            operation,
1868                            timeout_sec=_WAIT_FOR_OPERATION_SEC):
1869    start_time = time.time()
1870    while time.time() - start_time <= timeout_sec:
1871        result = gcp.compute.zoneOperations().get(
1872            project=gcp.project, zone=zone,
1873            operation=operation).execute(num_retries=_GCP_API_RETRIES)
1874        if result['status'] == 'DONE':
1875            if 'error' in result:
1876                raise Exception(result['error'])
1877            return
1878        time.sleep(2)
1879    raise Exception('Operation %s did not complete within %d' %
1880                    (operation, timeout_sec))
1881
1882
1883def wait_for_healthy_backends(gcp,
1884                              backend_service,
1885                              instance_group,
1886                              timeout_sec=_WAIT_FOR_BACKEND_SEC):
1887    start_time = time.time()
1888    config = {'group': instance_group.url}
1889    instance_names = get_instance_names(gcp, instance_group)
1890    expected_size = len(instance_names)
1891    while time.time() - start_time <= timeout_sec:
1892        for instance_name in instance_names:
1893            try:
1894                status = get_serving_status(instance_name, gcp.service_port)
1895                logger.info('serving status response from %s: %s',
1896                            instance_name, status)
1897            except grpc.RpcError as rpc_error:
1898                logger.info('checking serving status of %s failed: %s',
1899                            instance_name, rpc_error)
1900        result = gcp.compute.backendServices().getHealth(
1901            project=gcp.project,
1902            backendService=backend_service.name,
1903            body=config).execute(num_retries=_GCP_API_RETRIES)
1904        if 'healthStatus' in result:
1905            logger.info('received GCP healthStatus: %s', result['healthStatus'])
1906            healthy = True
1907            for instance in result['healthStatus']:
1908                if instance['healthState'] != 'HEALTHY':
1909                    healthy = False
1910                    break
1911            if healthy and expected_size == len(result['healthStatus']):
1912                return
1913        else:
1914            logger.info('no healthStatus received from GCP')
1915        time.sleep(5)
1916    raise Exception('Not all backends became healthy within %d seconds: %s' %
1917                    (timeout_sec, result))
1918
1919
1920def get_instance_names(gcp, instance_group):
1921    instance_names = []
1922    result = gcp.compute.instanceGroups().listInstances(
1923        project=gcp.project,
1924        zone=instance_group.zone,
1925        instanceGroup=instance_group.name,
1926        body={
1927            'instanceState': 'ALL'
1928        }).execute(num_retries=_GCP_API_RETRIES)
1929    if 'items' not in result:
1930        return []
1931    for item in result['items']:
1932        # listInstances() returns the full URL of the instance, which ends with
1933        # the instance name. compute.instances().get() requires using the
1934        # instance name (not the full URL) to look up instance details, so we
1935        # just extract the name manually.
1936        instance_name = item['instance'].split('/')[-1]
1937        instance_names.append(instance_name)
1938    logger.info('retrieved instance names: %s', instance_names)
1939    return instance_names
1940
1941
1942def clean_up(gcp):
1943    if gcp.global_forwarding_rule:
1944        delete_global_forwarding_rule(gcp)
1945    if gcp.target_proxy:
1946        delete_target_proxy(gcp)
1947    if gcp.url_map:
1948        delete_url_map(gcp)
1949    delete_backend_services(gcp)
1950    if gcp.health_check_firewall_rule:
1951        delete_firewall(gcp)
1952    if gcp.health_check:
1953        delete_health_check(gcp)
1954    delete_instance_groups(gcp)
1955    if gcp.instance_template:
1956        delete_instance_template(gcp)
1957
1958
1959class InstanceGroup(object):
1960
1961    def __init__(self, name, url, zone):
1962        self.name = name
1963        self.url = url
1964        self.zone = zone
1965
1966
1967class GcpResource(object):
1968
1969    def __init__(self, name, url):
1970        self.name = name
1971        self.url = url
1972
1973
1974class GcpState(object):
1975
1976    def __init__(self, compute, alpha_compute, project):
1977        self.compute = compute
1978        self.alpha_compute = alpha_compute
1979        self.project = project
1980        self.health_check = None
1981        self.health_check_firewall_rule = None
1982        self.backend_services = []
1983        self.url_map = None
1984        self.target_proxy = None
1985        self.global_forwarding_rule = None
1986        self.service_port = None
1987        self.instance_template = None
1988        self.instance_groups = []
1989
1990
1991alpha_compute = None
1992if args.compute_discovery_document:
1993    with open(args.compute_discovery_document, 'r') as discovery_doc:
1994        compute = googleapiclient.discovery.build_from_document(
1995            discovery_doc.read())
1996    if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
1997        with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
1998            alpha_compute = googleapiclient.discovery.build_from_document(
1999                discovery_doc.read())
2000else:
2001    compute = googleapiclient.discovery.build('compute', 'v1')
2002    if not args.only_stable_gcp_apis:
2003        alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
2004
2005try:
2006    gcp = GcpState(compute, alpha_compute, args.project_id)
2007    gcp_suffix = args.gcp_suffix
2008    health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
2009    if not args.use_existing_gcp_resources:
2010        if args.keep_gcp_resources:
2011            # Auto-generating a unique suffix in case of conflict should not be
2012            # combined with --keep_gcp_resources, as the suffix actually used
2013            # for GCP resources will not match the provided --gcp_suffix value.
2014            num_attempts = 1
2015        else:
2016            num_attempts = 5
2017        for i in range(num_attempts):
2018            try:
2019                logger.info('Using GCP suffix %s', gcp_suffix)
2020                create_health_check(gcp, health_check_name)
2021                break
2022            except googleapiclient.errors.HttpError as http_error:
2023                gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
2024                health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
2025                logger.exception('HttpError when creating health check')
2026        if gcp.health_check is None:
2027            raise Exception('Failed to create health check name after %d '
2028                            'attempts' % num_attempts)
2029    firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
2030    backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
2031    alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
2032    url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
2033    service_host_name = _BASE_SERVICE_HOST + gcp_suffix
2034    target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
2035    forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
2036    template_name = _BASE_TEMPLATE_NAME + gcp_suffix
2037    instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
2038    same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
2039    secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
2040    if args.use_existing_gcp_resources:
2041        logger.info('Reusing existing GCP resources')
2042        get_health_check(gcp, health_check_name)
2043        try:
2044            get_health_check_firewall_rule(gcp, firewall_name)
2045        except googleapiclient.errors.HttpError as http_error:
2046            # Firewall rule may be auto-deleted periodically depending on GCP
2047            # project settings.
2048            logger.exception('Failed to find firewall rule, recreating')
2049            create_health_check_firewall_rule(gcp, firewall_name)
2050        backend_service = get_backend_service(gcp, backend_service_name)
2051        alternate_backend_service = get_backend_service(
2052            gcp, alternate_backend_service_name)
2053        get_url_map(gcp, url_map_name)
2054        get_target_proxy(gcp, target_proxy_name)
2055        get_global_forwarding_rule(gcp, forwarding_rule_name)
2056        get_instance_template(gcp, template_name)
2057        instance_group = get_instance_group(gcp, args.zone, instance_group_name)
2058        same_zone_instance_group = get_instance_group(
2059            gcp, args.zone, same_zone_instance_group_name)
2060        secondary_zone_instance_group = get_instance_group(
2061            gcp, args.secondary_zone, secondary_zone_instance_group_name)
2062    else:
2063        create_health_check_firewall_rule(gcp, firewall_name)
2064        backend_service = add_backend_service(gcp, backend_service_name)
2065        alternate_backend_service = add_backend_service(
2066            gcp, alternate_backend_service_name)
2067        create_url_map(gcp, url_map_name, backend_service, service_host_name)
2068        create_target_proxy(gcp, target_proxy_name)
2069        potential_service_ports = list(args.service_port_range)
2070        random.shuffle(potential_service_ports)
2071        create_global_forwarding_rule(gcp, forwarding_rule_name,
2072                                      potential_service_ports)
2073        if not gcp.service_port:
2074            raise Exception(
2075                'Failed to find a valid ip:port for the forwarding rule')
2076        if gcp.service_port != _DEFAULT_SERVICE_PORT:
2077            patch_url_map_host_rule_with_port(gcp, url_map_name,
2078                                              backend_service,
2079                                              service_host_name)
2080        startup_script = get_startup_script(args.path_to_server_binary,
2081                                            gcp.service_port)
2082        create_instance_template(gcp, template_name, args.network,
2083                                 args.source_image, args.machine_type,
2084                                 startup_script)
2085        instance_group = add_instance_group(gcp, args.zone, instance_group_name,
2086                                            _INSTANCE_GROUP_SIZE)
2087        patch_backend_service(gcp, backend_service, [instance_group])
2088        same_zone_instance_group = add_instance_group(
2089            gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
2090        secondary_zone_instance_group = add_instance_group(
2091            gcp, args.secondary_zone, secondary_zone_instance_group_name,
2092            _INSTANCE_GROUP_SIZE)
2093
2094    wait_for_healthy_backends(gcp, backend_service, instance_group)
2095
2096    if args.test_case:
2097        client_env = dict(os.environ)
2098        bootstrap_server_features = []
2099        if gcp.service_port == _DEFAULT_SERVICE_PORT:
2100            server_uri = service_host_name
2101        else:
2102            server_uri = service_host_name + ':' + str(gcp.service_port)
2103        if args.xds_v3_support:
2104            client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
2105            bootstrap_server_features.append('xds_v3')
2106        if args.bootstrap_file:
2107            bootstrap_path = os.path.abspath(args.bootstrap_file)
2108        else:
2109            with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
2110                bootstrap_file.write(
2111                    _BOOTSTRAP_TEMPLATE.format(
2112                        node_id=socket.gethostname(),
2113                        server_features=json.dumps(
2114                            bootstrap_server_features)).encode('utf-8'))
2115                bootstrap_path = bootstrap_file.name
2116        client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
2117        client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
2118        test_results = {}
2119        failed_tests = []
2120        for test_case in args.test_case:
2121            result = jobset.JobResult()
2122            log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
2123            if not os.path.exists(log_dir):
2124                os.makedirs(log_dir)
2125            test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
2126            test_log_file = open(test_log_filename, 'w+')
2127            client_process = None
2128
2129            if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
2130                rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
2131            else:
2132                rpcs_to_send = '--rpc="UnaryCall"'
2133
2134            if test_case in _TESTS_TO_SEND_METADATA:
2135                metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU}"'.format(
2136                    keyE=_TEST_METADATA_KEY,
2137                    valueE=_TEST_METADATA_VALUE_EMPTY,
2138                    keyU=_TEST_METADATA_KEY,
2139                    valueU=_TEST_METADATA_VALUE_UNARY)
2140            else:
2141                # Setting the arg explicitly to empty with '--metadata=""'
2142                # makes C# client fail
2143                # (see https://github.com/commandlineparser/commandline/issues/412),
2144                # so instead we just rely on clients using the default when
2145                # metadata arg is not specified.
2146                metadata_to_send = ''
2147
2148            # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
2149            # in the client. This means we will ignore intermittent RPC
2150            # failures (but this framework still checks that the final result
2151            # is as expected).
2152            #
2153            # Reason for disabling this is, the resources are shared by
2154            # multiple tests, and a change in previous test could be delayed
2155            # until the second test starts. The second test may see
2156            # intermittent failures because of that.
2157            #
2158            # A fix is to not share resources between tests (though that does
2159            # mean the tests will be significantly slower due to creating new
2160            # resources).
2161            fail_on_failed_rpc = ''
2162
2163            try:
2164                if not CLIENT_HOSTS:
2165                    client_cmd_formatted = args.client_cmd.format(
2166                        server_uri=server_uri,
2167                        stats_port=args.stats_port,
2168                        qps=args.qps,
2169                        fail_on_failed_rpc=fail_on_failed_rpc,
2170                        rpcs_to_send=rpcs_to_send,
2171                        metadata_to_send=metadata_to_send)
2172                    logger.debug('running client: %s', client_cmd_formatted)
2173                    client_cmd = shlex.split(client_cmd_formatted)
2174                    client_process = subprocess.Popen(client_cmd,
2175                                                      env=client_env,
2176                                                      stderr=subprocess.STDOUT,
2177                                                      stdout=test_log_file)
2178                if test_case == 'backends_restart':
2179                    test_backends_restart(gcp, backend_service, instance_group)
2180                elif test_case == 'change_backend_service':
2181                    test_change_backend_service(gcp, backend_service,
2182                                                instance_group,
2183                                                alternate_backend_service,
2184                                                same_zone_instance_group)
2185                elif test_case == 'gentle_failover':
2186                    test_gentle_failover(gcp, backend_service, instance_group,
2187                                         secondary_zone_instance_group)
2188                elif test_case == 'ping_pong':
2189                    test_ping_pong(gcp, backend_service, instance_group)
2190                elif test_case == 'remove_instance_group':
2191                    test_remove_instance_group(gcp, backend_service,
2192                                               instance_group,
2193                                               same_zone_instance_group)
2194                elif test_case == 'round_robin':
2195                    test_round_robin(gcp, backend_service, instance_group)
2196                elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
2197                    test_secondary_locality_gets_no_requests_on_partial_primary_failure(
2198                        gcp, backend_service, instance_group,
2199                        secondary_zone_instance_group)
2200                elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
2201                    test_secondary_locality_gets_requests_on_primary_failure(
2202                        gcp, backend_service, instance_group,
2203                        secondary_zone_instance_group)
2204                elif test_case == 'traffic_splitting':
2205                    test_traffic_splitting(gcp, backend_service, instance_group,
2206                                           alternate_backend_service,
2207                                           same_zone_instance_group)
2208                elif test_case == 'path_matching':
2209                    test_path_matching(gcp, backend_service, instance_group,
2210                                       alternate_backend_service,
2211                                       same_zone_instance_group)
2212                elif test_case == 'header_matching':
2213                    test_header_matching(gcp, backend_service, instance_group,
2214                                         alternate_backend_service,
2215                                         same_zone_instance_group)
2216                elif test_case == 'circuit_breaking':
2217                    test_circuit_breaking(gcp, backend_service, instance_group,
2218                                          same_zone_instance_group)
2219                else:
2220                    logger.error('Unknown test case: %s', test_case)
2221                    sys.exit(1)
2222                if client_process and client_process.poll() is not None:
2223                    raise Exception(
2224                        'Client process exited prematurely with exit code %d' %
2225                        client_process.returncode)
2226                result.state = 'PASSED'
2227                result.returncode = 0
2228            except Exception as e:
2229                logger.exception('Test case %s failed', test_case)
2230                failed_tests.append(test_case)
2231                result.state = 'FAILED'
2232                result.message = str(e)
2233            finally:
2234                if client_process:
2235                    if client_process.returncode:
2236                        logger.info('Client exited with code %d' %
2237                                    client_process.returncode)
2238                    else:
2239                        client_process.terminate()
2240                test_log_file.close()
2241                # Workaround for Python 3, as report_utils will invoke decode() on
2242                # result.message, which has a default value of ''.
2243                result.message = result.message.encode('UTF-8')
2244                test_results[test_case] = [result]
2245                if args.log_client_output:
2246                    logger.info('Client output:')
2247                    with open(test_log_filename, 'r') as client_output:
2248                        logger.info(client_output.read())
2249        if not os.path.exists(_TEST_LOG_BASE_DIR):
2250            os.makedirs(_TEST_LOG_BASE_DIR)
2251        report_utils.render_junit_xml_report(test_results,
2252                                             os.path.join(
2253                                                 _TEST_LOG_BASE_DIR,
2254                                                 _SPONGE_XML_NAME),
2255                                             suite_name='xds_tests',
2256                                             multi_target=True)
2257        if failed_tests:
2258            logger.error('Test case(s) %s failed', failed_tests)
2259            sys.exit(1)
2260finally:
2261    if not args.keep_gcp_resources:
2262        logger.info('Cleaning up GCP resources. This may take some time.')
2263        clean_up(gcp)
2264