1#!/usr/bin/env python 2# Copyright 2020 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run xDS integration tests on GCP using Traffic Director.""" 16 17import argparse 18import googleapiclient.discovery 19import grpc 20import json 21import logging 22import os 23import random 24import shlex 25import socket 26import subprocess 27import sys 28import tempfile 29import time 30 31from oauth2client.client import GoogleCredentials 32 33import python_utils.jobset as jobset 34import python_utils.report_utils as report_utils 35 36from src.proto.grpc.health.v1 import health_pb2 37from src.proto.grpc.health.v1 import health_pb2_grpc 38from src.proto.grpc.testing import empty_pb2 39from src.proto.grpc.testing import messages_pb2 40from src.proto.grpc.testing import test_pb2_grpc 41 42logger = logging.getLogger() 43console_handler = logging.StreamHandler() 44formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') 45console_handler.setFormatter(formatter) 46logger.handlers = [] 47logger.addHandler(console_handler) 48logger.setLevel(logging.WARNING) 49 50_TEST_CASES = [ 51 'backends_restart', 52 'change_backend_service', 53 'gentle_failover', 54 'ping_pong', 55 'remove_instance_group', 56 'round_robin', 57 'secondary_locality_gets_no_requests_on_partial_primary_failure', 58 'secondary_locality_gets_requests_on_primary_failure', 59 'traffic_splitting', 60] 61# Valid test cases, but not in all. So the tests can only run manually, and 62# aren't enabled automatically for all languages. 63# 64# TODO: Move them into _TEST_CASES when support is ready in all languages. 65_ADDITIONAL_TEST_CASES = [ 66 'path_matching', 67 'header_matching', 68 'circuit_breaking', 69] 70 71 72def parse_test_cases(arg): 73 if arg == '': 74 return [] 75 arg_split = arg.split(',') 76 test_cases = set() 77 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES 78 for arg in arg_split: 79 if arg == "all": 80 test_cases = test_cases.union(_TEST_CASES) 81 else: 82 test_cases = test_cases.union([arg]) 83 if not all([test_case in all_test_cases for test_case in test_cases]): 84 raise Exception('Failed to parse test cases %s' % arg) 85 # Perserve order. 86 return [x for x in all_test_cases if x in test_cases] 87 88 89def parse_port_range(port_arg): 90 try: 91 port = int(port_arg) 92 return range(port, port + 1) 93 except: 94 port_min, port_max = port_arg.split(':') 95 return range(int(port_min), int(port_max) + 1) 96 97 98argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP') 99argp.add_argument('--project_id', help='GCP project id') 100argp.add_argument( 101 '--gcp_suffix', 102 default='', 103 help='Optional suffix for all generated GCP resource names. Useful to ' 104 'ensure distinct names across test runs.') 105argp.add_argument( 106 '--test_case', 107 default='ping_pong', 108 type=parse_test_cases, 109 help='Comma-separated list of test cases to run. Available tests: %s, ' 110 '(or \'all\' to run every test). ' 111 'Alternative tests not included in \'all\': %s' % 112 (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES))) 113argp.add_argument( 114 '--bootstrap_file', 115 default='', 116 help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in ' 117 'bootstrap generation') 118argp.add_argument( 119 '--xds_v3_support', 120 default=False, 121 action='store_true', 122 help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. ' 123 'If a pre-created bootstrap file is provided via the --bootstrap_file ' 124 'parameter, it should include xds_v3 in its server_features field.') 125argp.add_argument( 126 '--client_cmd', 127 default=None, 128 help='Command to launch xDS test client. {server_uri}, {stats_port} and ' 129 '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP ' 130 'will be set for the command') 131argp.add_argument( 132 '--client_hosts', 133 default=None, 134 help='Comma-separated list of hosts running client processes. If set, ' 135 '--client_cmd is ignored and client processes are assumed to be running on ' 136 'the specified hosts.') 137argp.add_argument('--zone', default='us-central1-a') 138argp.add_argument('--secondary_zone', 139 default='us-west1-b', 140 help='Zone to use for secondary TD locality tests') 141argp.add_argument('--qps', default=100, type=int, help='Client QPS') 142argp.add_argument( 143 '--wait_for_backend_sec', 144 default=1200, 145 type=int, 146 help='Time limit for waiting for created backend services to report ' 147 'healthy when launching or updated GCP resources') 148argp.add_argument( 149 '--use_existing_gcp_resources', 150 default=False, 151 action='store_true', 152 help= 153 'If set, find and use already created GCP resources instead of creating new' 154 ' ones.') 155argp.add_argument( 156 '--keep_gcp_resources', 157 default=False, 158 action='store_true', 159 help= 160 'Leave GCP VMs and configuration running after test. Default behavior is ' 161 'to delete when tests complete.') 162argp.add_argument( 163 '--compute_discovery_document', 164 default=None, 165 type=str, 166 help= 167 'If provided, uses this file instead of retrieving via the GCP discovery ' 168 'API') 169argp.add_argument( 170 '--alpha_compute_discovery_document', 171 default=None, 172 type=str, 173 help='If provided, uses this file instead of retrieving via the alpha GCP ' 174 'discovery API') 175argp.add_argument('--network', 176 default='global/networks/default', 177 help='GCP network to use') 178argp.add_argument('--service_port_range', 179 default='8080:8110', 180 type=parse_port_range, 181 help='Listening port for created gRPC backends. Specified as ' 182 'either a single int or as a range in the format min:max, in ' 183 'which case an available port p will be chosen s.t. min <= p ' 184 '<= max') 185argp.add_argument( 186 '--stats_port', 187 default=8079, 188 type=int, 189 help='Local port for the client process to expose the LB stats service') 190argp.add_argument('--xds_server', 191 default='trafficdirector.googleapis.com:443', 192 help='xDS server') 193argp.add_argument('--source_image', 194 default='projects/debian-cloud/global/images/family/debian-9', 195 help='Source image for VMs created during the test') 196argp.add_argument('--path_to_server_binary', 197 default=None, 198 type=str, 199 help='If set, the server binary must already be pre-built on ' 200 'the specified source image') 201argp.add_argument('--machine_type', 202 default='e2-standard-2', 203 help='Machine type for VMs created during the test') 204argp.add_argument( 205 '--instance_group_size', 206 default=2, 207 type=int, 208 help='Number of VMs to create per instance group. Certain test cases (e.g., ' 209 'round_robin) may not give meaningful results if this is set to a value ' 210 'less than 2.') 211argp.add_argument('--verbose', 212 help='verbose log output', 213 default=False, 214 action='store_true') 215# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are 216# visible in all test environments. 217argp.add_argument('--log_client_output', 218 help='Log captured client output', 219 default=False, 220 action='store_true') 221# TODO(ericgribkoff) Remove this flag once all test environments are verified to 222# have access to the alpha compute APIs. 223argp.add_argument('--only_stable_gcp_apis', 224 help='Do not use alpha compute APIs. Some tests may be ' 225 'incompatible with this option (gRPC health checks are ' 226 'currently alpha and required for simulating server failure', 227 default=False, 228 action='store_true') 229args = argp.parse_args() 230 231if args.verbose: 232 logger.setLevel(logging.DEBUG) 233 234CLIENT_HOSTS = [] 235if args.client_hosts: 236 CLIENT_HOSTS = args.client_hosts.split(',') 237 238_DEFAULT_SERVICE_PORT = 80 239_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec 240_WAIT_FOR_OPERATION_SEC = 1200 241_INSTANCE_GROUP_SIZE = args.instance_group_size 242_NUM_TEST_RPCS = 10 * args.qps 243_WAIT_FOR_STATS_SEC = 360 244_WAIT_FOR_VALID_CONFIG_SEC = 60 245_WAIT_FOR_URL_MAP_PATCH_SEC = 300 246_CONNECTION_TIMEOUT_SEC = 60 247_GCP_API_RETRIES = 5 248_BOOTSTRAP_TEMPLATE = """ 249{{ 250 "node": {{ 251 "id": "{node_id}", 252 "metadata": {{ 253 "TRAFFICDIRECTOR_NETWORK_NAME": "%s" 254 }}, 255 "locality": {{ 256 "zone": "%s" 257 }} 258 }}, 259 "xds_servers": [{{ 260 "server_uri": "%s", 261 "channel_creds": [ 262 {{ 263 "type": "google_default", 264 "config": {{}} 265 }} 266 ], 267 "server_features": {server_features} 268 }}] 269}}""" % (args.network.split('/')[-1], args.zone, args.xds_server) 270 271# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer 272# sends an update with no localities when adding the MIG to the backend service 273# can race with the URL map patch. 274_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin'] 275# Tests that run UnaryCall and EmptyCall. 276_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching'] 277# Tests that make UnaryCall with test metadata. 278_TESTS_TO_SEND_METADATA = ['header_matching'] 279_TEST_METADATA_KEY = 'xds_md' 280_TEST_METADATA_VALUE_UNARY = 'unary_yranu' 281_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme' 282_PATH_MATCHER_NAME = 'path-matcher' 283_BASE_TEMPLATE_NAME = 'test-template' 284_BASE_INSTANCE_GROUP_NAME = 'test-ig' 285_BASE_HEALTH_CHECK_NAME = 'test-hc' 286_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' 287_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' 288_BASE_URL_MAP_NAME = 'test-map' 289_BASE_SERVICE_HOST = 'grpc-test' 290_BASE_TARGET_PROXY_NAME = 'test-target-proxy' 291_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' 292_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 293 '../../reports') 294_SPONGE_LOG_NAME = 'sponge_log.log' 295_SPONGE_XML_NAME = 'sponge_log.xml' 296 297 298def get_client_stats(num_rpcs, timeout_sec): 299 if CLIENT_HOSTS: 300 hosts = CLIENT_HOSTS 301 else: 302 hosts = ['localhost'] 303 for host in hosts: 304 with grpc.insecure_channel('%s:%d' % 305 (host, args.stats_port)) as channel: 306 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 307 request = messages_pb2.LoadBalancerStatsRequest() 308 request.num_rpcs = num_rpcs 309 request.timeout_sec = timeout_sec 310 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC 311 logger.debug('Invoking GetClientStats RPC to %s:%d:', host, 312 args.stats_port) 313 response = stub.GetClientStats(request, 314 wait_for_ready=True, 315 timeout=rpc_timeout) 316 logger.debug('Invoked GetClientStats RPC to %s: %s', host, response) 317 return response 318 319 320def get_client_accumulated_stats(): 321 if CLIENT_HOSTS: 322 hosts = CLIENT_HOSTS 323 else: 324 hosts = ['localhost'] 325 for host in hosts: 326 with grpc.insecure_channel('%s:%d' % 327 (host, args.stats_port)) as channel: 328 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 329 request = messages_pb2.LoadBalancerAccumulatedStatsRequest() 330 logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', 331 host, args.stats_port) 332 response = stub.GetClientAccumulatedStats( 333 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) 334 logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', 335 host, response) 336 return response 337 338 339def configure_client(rpc_types, metadata): 340 if CLIENT_HOSTS: 341 hosts = CLIENT_HOSTS 342 else: 343 hosts = ['localhost'] 344 for host in hosts: 345 with grpc.insecure_channel('%s:%d' % 346 (host, args.stats_port)) as channel: 347 stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) 348 request = messages_pb2.ClientConfigureRequest() 349 request.types.extend(rpc_types) 350 for rpc_type, md_key, md_value in metadata: 351 md = request.metadata.add() 352 md.type = rpc_type 353 md.key = md_key 354 md.value = md_value 355 logger.debug( 356 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', 357 host, args.stats_port, request) 358 stub.Configure(request, 359 wait_for_ready=True, 360 timeout=_CONNECTION_TIMEOUT_SEC) 361 logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', 362 host) 363 364 365class RpcDistributionError(Exception): 366 pass 367 368 369def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, 370 allow_failures): 371 start_time = time.time() 372 error_msg = None 373 logger.debug('Waiting for %d sec until backends %s receive load' % 374 (timeout_sec, backends)) 375 while time.time() - start_time <= timeout_sec: 376 error_msg = None 377 stats = get_client_stats(num_rpcs, timeout_sec) 378 rpcs_by_peer = stats.rpcs_by_peer 379 for backend in backends: 380 if backend not in rpcs_by_peer: 381 error_msg = 'Backend %s did not receive load' % backend 382 break 383 if not error_msg and len(rpcs_by_peer) > len(backends): 384 error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer 385 if not allow_failures and stats.num_failures > 0: 386 error_msg = '%d RPCs failed' % stats.num_failures 387 if not error_msg: 388 return 389 raise RpcDistributionError(error_msg) 390 391 392def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, 393 timeout_sec, 394 num_rpcs=_NUM_TEST_RPCS): 395 _verify_rpcs_to_given_backends(backends, 396 timeout_sec, 397 num_rpcs, 398 allow_failures=True) 399 400 401def wait_until_all_rpcs_go_to_given_backends(backends, 402 timeout_sec, 403 num_rpcs=_NUM_TEST_RPCS): 404 _verify_rpcs_to_given_backends(backends, 405 timeout_sec, 406 num_rpcs, 407 allow_failures=False) 408 409 410def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): 411 '''Block until the test client reaches the state with the given number 412 of RPCs being outstanding stably. 413 414 Args: 415 rpc_type: A string indicating the RPC method to check for. Either 416 'UnaryCall' or 'EmptyCall'. 417 timeout_sec: Maximum number of seconds to wait until the desired state 418 is reached. 419 num_rpcs: Expected number of RPCs to be in-flight. 420 threshold: Number within [0,100], the tolerable percentage by which 421 the actual number of RPCs in-flight can differ from the expected number. 422 ''' 423 if threshold < 0 or threshold > 100: 424 raise ValueError('Value error: Threshold should be between 0 to 100') 425 threshold_fraction = threshold / 100.0 426 start_time = time.time() 427 error_msg = None 428 logger.debug( 429 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % 430 (timeout_sec, num_rpcs, rpc_type, threshold)) 431 while time.time() - start_time <= timeout_sec: 432 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 433 threshold_fraction) 434 if error_msg: 435 logger.debug('Progress: %s', error_msg) 436 time.sleep(2) 437 else: 438 break 439 # Ensure the number of outstanding RPCs is stable. 440 if not error_msg: 441 time.sleep(5) 442 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 443 threshold_fraction) 444 if error_msg: 445 raise Exception("Wrong number of %s RPCs in-flight: %s" % 446 (rpc_type, error_msg)) 447 448 449def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): 450 error_msg = None 451 stats = get_client_accumulated_stats() 452 rpcs_started = stats.num_rpcs_started_by_method[rpc_type] 453 rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] 454 rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] 455 rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed 456 if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): 457 error_msg = ('actual(%d) < expected(%d - %d%%)' % 458 (rpcs_in_flight, num_rpcs, threshold)) 459 elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): 460 error_msg = ('actual(%d) > expected(%d + %d%%)' % 461 (rpcs_in_flight, num_rpcs, threshold)) 462 return error_msg 463 464 465def compare_distributions(actual_distribution, expected_distribution, 466 threshold): 467 """Compare if two distributions are similar. 468 469 Args: 470 actual_distribution: A list of floats, contains the actual distribution. 471 expected_distribution: A list of floats, contains the expected distribution. 472 threshold: Number within [0,100], the threshold percentage by which the 473 actual distribution can differ from the expected distribution. 474 475 Returns: 476 The similarity between the distributions as a boolean. Returns true if the 477 actual distribution lies within the threshold of the expected 478 distribution, false otherwise. 479 480 Raises: 481 ValueError: if threshold is not with in [0,100]. 482 Exception: containing detailed error messages. 483 """ 484 if len(expected_distribution) != len(actual_distribution): 485 raise Exception( 486 'Error: expected and actual distributions have different size (%d vs %d)' 487 % (len(expected_distribution), len(actual_distribution))) 488 if threshold < 0 or threshold > 100: 489 raise ValueError('Value error: Threshold should be between 0 to 100') 490 threshold_fraction = threshold / 100.0 491 for expected, actual in zip(expected_distribution, actual_distribution): 492 if actual < (expected * (1 - threshold_fraction)): 493 raise Exception("actual(%f) < expected(%f-%d%%)" % 494 (actual, expected, threshold)) 495 if actual > (expected * (1 + threshold_fraction)): 496 raise Exception("actual(%f) > expected(%f+%d%%)" % 497 (actual, expected, threshold)) 498 return True 499 500 501def compare_expected_instances(stats, expected_instances): 502 """Compare if stats have expected instances for each type of RPC. 503 504 Args: 505 stats: LoadBalancerStatsResponse reported by interop client. 506 expected_instances: a dict with key as the RPC type (string), value as 507 the expected backend instances (list of strings). 508 509 Returns: 510 Returns true if the instances are expected. False if not. 511 """ 512 for rpc_type, expected_peers in expected_instances.items(): 513 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] 514 rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None 515 logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer) 516 peers = list(rpcs_by_peer.keys()) 517 if set(peers) != set(expected_peers): 518 logger.info('unexpected peers for %s, got %s, want %s', rpc_type, 519 peers, expected_peers) 520 return False 521 return True 522 523 524def test_backends_restart(gcp, backend_service, instance_group): 525 logger.info('Running test_backends_restart') 526 instance_names = get_instance_names(gcp, instance_group) 527 num_instances = len(instance_names) 528 start_time = time.time() 529 wait_until_all_rpcs_go_to_given_backends(instance_names, 530 _WAIT_FOR_STATS_SEC) 531 try: 532 resize_instance_group(gcp, instance_group, 0) 533 wait_until_all_rpcs_go_to_given_backends_or_fail([], 534 _WAIT_FOR_BACKEND_SEC) 535 finally: 536 resize_instance_group(gcp, instance_group, num_instances) 537 wait_for_healthy_backends(gcp, backend_service, instance_group) 538 new_instance_names = get_instance_names(gcp, instance_group) 539 wait_until_all_rpcs_go_to_given_backends(new_instance_names, 540 _WAIT_FOR_BACKEND_SEC) 541 542 543def test_change_backend_service(gcp, original_backend_service, instance_group, 544 alternate_backend_service, 545 same_zone_instance_group): 546 logger.info('Running test_change_backend_service') 547 original_backend_instances = get_instance_names(gcp, instance_group) 548 alternate_backend_instances = get_instance_names(gcp, 549 same_zone_instance_group) 550 patch_backend_service(gcp, alternate_backend_service, 551 [same_zone_instance_group]) 552 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 553 wait_for_healthy_backends(gcp, alternate_backend_service, 554 same_zone_instance_group) 555 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 556 _WAIT_FOR_STATS_SEC) 557 try: 558 patch_url_map_backend_service(gcp, alternate_backend_service) 559 wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, 560 _WAIT_FOR_URL_MAP_PATCH_SEC) 561 finally: 562 patch_url_map_backend_service(gcp, original_backend_service) 563 patch_backend_service(gcp, alternate_backend_service, []) 564 565 566def test_gentle_failover(gcp, 567 backend_service, 568 primary_instance_group, 569 secondary_instance_group, 570 swapped_primary_and_secondary=False): 571 logger.info('Running test_gentle_failover') 572 num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) 573 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover 574 try: 575 if num_primary_instances < min_instances_for_gentle_failover: 576 resize_instance_group(gcp, primary_instance_group, 577 min_instances_for_gentle_failover) 578 patch_backend_service( 579 gcp, backend_service, 580 [primary_instance_group, secondary_instance_group]) 581 primary_instance_names = get_instance_names(gcp, primary_instance_group) 582 secondary_instance_names = get_instance_names(gcp, 583 secondary_instance_group) 584 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 585 wait_for_healthy_backends(gcp, backend_service, 586 secondary_instance_group) 587 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 588 _WAIT_FOR_STATS_SEC) 589 instances_to_stop = primary_instance_names[:-1] 590 remaining_instances = primary_instance_names[-1:] 591 try: 592 set_serving_status(instances_to_stop, 593 gcp.service_port, 594 serving=False) 595 wait_until_all_rpcs_go_to_given_backends( 596 remaining_instances + secondary_instance_names, 597 _WAIT_FOR_BACKEND_SEC) 598 finally: 599 set_serving_status(primary_instance_names, 600 gcp.service_port, 601 serving=True) 602 except RpcDistributionError as e: 603 if not swapped_primary_and_secondary and is_primary_instance_group( 604 gcp, secondary_instance_group): 605 # Swap expectation of primary and secondary instance groups. 606 test_gentle_failover(gcp, 607 backend_service, 608 secondary_instance_group, 609 primary_instance_group, 610 swapped_primary_and_secondary=True) 611 else: 612 raise e 613 finally: 614 patch_backend_service(gcp, backend_service, [primary_instance_group]) 615 resize_instance_group(gcp, primary_instance_group, 616 num_primary_instances) 617 instance_names = get_instance_names(gcp, primary_instance_group) 618 wait_until_all_rpcs_go_to_given_backends(instance_names, 619 _WAIT_FOR_BACKEND_SEC) 620 621 622def test_ping_pong(gcp, backend_service, instance_group): 623 logger.info('Running test_ping_pong') 624 wait_for_healthy_backends(gcp, backend_service, instance_group) 625 instance_names = get_instance_names(gcp, instance_group) 626 wait_until_all_rpcs_go_to_given_backends(instance_names, 627 _WAIT_FOR_STATS_SEC) 628 629 630def test_remove_instance_group(gcp, backend_service, instance_group, 631 same_zone_instance_group): 632 logger.info('Running test_remove_instance_group') 633 try: 634 patch_backend_service(gcp, 635 backend_service, 636 [instance_group, same_zone_instance_group], 637 balancing_mode='RATE') 638 wait_for_healthy_backends(gcp, backend_service, instance_group) 639 wait_for_healthy_backends(gcp, backend_service, 640 same_zone_instance_group) 641 instance_names = get_instance_names(gcp, instance_group) 642 same_zone_instance_names = get_instance_names(gcp, 643 same_zone_instance_group) 644 try: 645 wait_until_all_rpcs_go_to_given_backends( 646 instance_names + same_zone_instance_names, 647 _WAIT_FOR_OPERATION_SEC) 648 remaining_instance_group = same_zone_instance_group 649 remaining_instance_names = same_zone_instance_names 650 except RpcDistributionError as e: 651 # If connected to TD in a different zone, we may route traffic to 652 # only one instance group. Determine which group that is to continue 653 # with the remainder of the test case. 654 try: 655 wait_until_all_rpcs_go_to_given_backends( 656 instance_names, _WAIT_FOR_STATS_SEC) 657 remaining_instance_group = same_zone_instance_group 658 remaining_instance_names = same_zone_instance_names 659 except RpcDistributionError as e: 660 wait_until_all_rpcs_go_to_given_backends( 661 same_zone_instance_names, _WAIT_FOR_STATS_SEC) 662 remaining_instance_group = instance_group 663 remaining_instance_names = instance_names 664 patch_backend_service(gcp, 665 backend_service, [remaining_instance_group], 666 balancing_mode='RATE') 667 wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, 668 _WAIT_FOR_BACKEND_SEC) 669 finally: 670 patch_backend_service(gcp, backend_service, [instance_group]) 671 wait_until_all_rpcs_go_to_given_backends(instance_names, 672 _WAIT_FOR_BACKEND_SEC) 673 674 675def test_round_robin(gcp, backend_service, instance_group): 676 logger.info('Running test_round_robin') 677 wait_for_healthy_backends(gcp, backend_service, instance_group) 678 instance_names = get_instance_names(gcp, instance_group) 679 threshold = 1 680 wait_until_all_rpcs_go_to_given_backends(instance_names, 681 _WAIT_FOR_STATS_SEC) 682 # TODO(ericgribkoff) Delayed config propagation from earlier tests 683 # may result in briefly receiving an empty EDS update, resulting in failed 684 # RPCs. Retry distribution validation if this occurs; long-term fix is 685 # creating new backend resources for each individual test case. 686 # Each attempt takes 10 seconds. Config propagation can take several 687 # minutes. 688 max_attempts = 40 689 for i in range(max_attempts): 690 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 691 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] 692 total_requests_received = sum(requests_received) 693 if total_requests_received != _NUM_TEST_RPCS: 694 logger.info('Unexpected RPC failures, retrying: %s', stats) 695 continue 696 expected_requests = total_requests_received / len(instance_names) 697 for instance in instance_names: 698 if abs(stats.rpcs_by_peer[instance] - 699 expected_requests) > threshold: 700 raise Exception( 701 'RPC peer distribution differs from expected by more than %d ' 702 'for instance %s (%s)' % (threshold, instance, stats)) 703 return 704 raise Exception('RPC failures persisted through %d retries' % max_attempts) 705 706 707def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 708 gcp, 709 backend_service, 710 primary_instance_group, 711 secondary_instance_group, 712 swapped_primary_and_secondary=False): 713 logger.info( 714 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' 715 ) 716 try: 717 patch_backend_service( 718 gcp, backend_service, 719 [primary_instance_group, secondary_instance_group]) 720 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 721 wait_for_healthy_backends(gcp, backend_service, 722 secondary_instance_group) 723 primary_instance_names = get_instance_names(gcp, primary_instance_group) 724 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 725 _WAIT_FOR_STATS_SEC) 726 instances_to_stop = primary_instance_names[:1] 727 remaining_instances = primary_instance_names[1:] 728 try: 729 set_serving_status(instances_to_stop, 730 gcp.service_port, 731 serving=False) 732 wait_until_all_rpcs_go_to_given_backends(remaining_instances, 733 _WAIT_FOR_BACKEND_SEC) 734 finally: 735 set_serving_status(primary_instance_names, 736 gcp.service_port, 737 serving=True) 738 except RpcDistributionError as e: 739 if not swapped_primary_and_secondary and is_primary_instance_group( 740 gcp, secondary_instance_group): 741 # Swap expectation of primary and secondary instance groups. 742 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 743 gcp, 744 backend_service, 745 secondary_instance_group, 746 primary_instance_group, 747 swapped_primary_and_secondary=True) 748 else: 749 raise e 750 finally: 751 patch_backend_service(gcp, backend_service, [primary_instance_group]) 752 753 754def test_secondary_locality_gets_requests_on_primary_failure( 755 gcp, 756 backend_service, 757 primary_instance_group, 758 secondary_instance_group, 759 swapped_primary_and_secondary=False): 760 logger.info('Running secondary_locality_gets_requests_on_primary_failure') 761 try: 762 patch_backend_service( 763 gcp, backend_service, 764 [primary_instance_group, secondary_instance_group]) 765 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 766 wait_for_healthy_backends(gcp, backend_service, 767 secondary_instance_group) 768 primary_instance_names = get_instance_names(gcp, primary_instance_group) 769 secondary_instance_names = get_instance_names(gcp, 770 secondary_instance_group) 771 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 772 _WAIT_FOR_STATS_SEC) 773 try: 774 set_serving_status(primary_instance_names, 775 gcp.service_port, 776 serving=False) 777 wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, 778 _WAIT_FOR_BACKEND_SEC) 779 finally: 780 set_serving_status(primary_instance_names, 781 gcp.service_port, 782 serving=True) 783 except RpcDistributionError as e: 784 if not swapped_primary_and_secondary and is_primary_instance_group( 785 gcp, secondary_instance_group): 786 # Swap expectation of primary and secondary instance groups. 787 test_secondary_locality_gets_requests_on_primary_failure( 788 gcp, 789 backend_service, 790 secondary_instance_group, 791 primary_instance_group, 792 swapped_primary_and_secondary=True) 793 else: 794 raise e 795 finally: 796 patch_backend_service(gcp, backend_service, [primary_instance_group]) 797 798 799def prepare_services_for_urlmap_tests(gcp, original_backend_service, 800 instance_group, alternate_backend_service, 801 same_zone_instance_group): 802 ''' 803 This function prepares the services to be ready for tests that modifies 804 urlmaps. 805 806 Returns: 807 Returns original and alternate backend names as lists of strings. 808 ''' 809 logger.info('waiting for original backends to become healthy') 810 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 811 812 patch_backend_service(gcp, alternate_backend_service, 813 [same_zone_instance_group]) 814 logger.info('waiting for alternate to become healthy') 815 wait_for_healthy_backends(gcp, alternate_backend_service, 816 same_zone_instance_group) 817 818 original_backend_instances = get_instance_names(gcp, instance_group) 819 logger.info('original backends instances: %s', original_backend_instances) 820 821 alternate_backend_instances = get_instance_names(gcp, 822 same_zone_instance_group) 823 logger.info('alternate backends instances: %s', alternate_backend_instances) 824 825 # Start with all traffic going to original_backend_service. 826 logger.info('waiting for traffic to all go to original backends') 827 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 828 _WAIT_FOR_STATS_SEC) 829 return original_backend_instances, alternate_backend_instances 830 831 832def test_traffic_splitting(gcp, original_backend_service, instance_group, 833 alternate_backend_service, same_zone_instance_group): 834 # This test start with all traffic going to original_backend_service. Then 835 # it updates URL-map to set default action to traffic splitting between 836 # original and alternate. It waits for all backends in both services to 837 # receive traffic, then verifies that weights are expected. 838 logger.info('Running test_traffic_splitting') 839 840 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 841 gcp, original_backend_service, instance_group, 842 alternate_backend_service, same_zone_instance_group) 843 844 try: 845 # Patch urlmap, change route action to traffic splitting between 846 # original and alternate. 847 logger.info('patching url map with traffic splitting') 848 original_service_percentage, alternate_service_percentage = 20, 80 849 patch_url_map_backend_service( 850 gcp, 851 services_with_weights={ 852 original_backend_service: original_service_percentage, 853 alternate_backend_service: alternate_service_percentage, 854 }) 855 # Split percentage between instances: [20,80] -> [10,10,40,40]. 856 expected_instance_percentage = [ 857 original_service_percentage * 1.0 / len(original_backend_instances) 858 ] * len(original_backend_instances) + [ 859 alternate_service_percentage * 1.0 / 860 len(alternate_backend_instances) 861 ] * len(alternate_backend_instances) 862 863 # Wait for traffic to go to both services. 864 logger.info( 865 'waiting for traffic to go to all backends (including alternate)') 866 wait_until_all_rpcs_go_to_given_backends( 867 original_backend_instances + alternate_backend_instances, 868 _WAIT_FOR_STATS_SEC) 869 870 # Verify that weights between two services are expected. 871 retry_count = 10 872 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 873 # seconds timeout. 874 for i in range(retry_count): 875 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 876 got_instance_count = [ 877 stats.rpcs_by_peer[i] for i in original_backend_instances 878 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] 879 total_count = sum(got_instance_count) 880 got_instance_percentage = [ 881 x * 100.0 / total_count for x in got_instance_count 882 ] 883 884 try: 885 compare_distributions(got_instance_percentage, 886 expected_instance_percentage, 5) 887 except Exception as e: 888 logger.info('attempt %d', i) 889 logger.info('got percentage: %s', got_instance_percentage) 890 logger.info('expected percentage: %s', 891 expected_instance_percentage) 892 logger.info(e) 893 if i == retry_count - 1: 894 raise Exception( 895 'RPC distribution (%s) differs from expected (%s)' % 896 (got_instance_percentage, expected_instance_percentage)) 897 else: 898 logger.info("success") 899 break 900 finally: 901 patch_url_map_backend_service(gcp, original_backend_service) 902 patch_backend_service(gcp, alternate_backend_service, []) 903 904 905def test_path_matching(gcp, original_backend_service, instance_group, 906 alternate_backend_service, same_zone_instance_group): 907 # This test start with all traffic (UnaryCall and EmptyCall) going to 908 # original_backend_service. 909 # 910 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to 911 # go different backends. It waits for all backends in both services to 912 # receive traffic, then verifies that traffic goes to the expected 913 # backends. 914 logger.info('Running test_path_matching') 915 916 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 917 gcp, original_backend_service, instance_group, 918 alternate_backend_service, same_zone_instance_group) 919 920 try: 921 # A list of tuples (route_rules, expected_instances). 922 test_cases = [ 923 ( 924 [{ 925 'priority': 0, 926 # FullPath EmptyCall -> alternate_backend_service. 927 'matchRules': [{ 928 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 929 }], 930 'service': alternate_backend_service.url 931 }], 932 { 933 "EmptyCall": alternate_backend_instances, 934 "UnaryCall": original_backend_instances 935 }), 936 ( 937 [{ 938 'priority': 0, 939 # Prefix UnaryCall -> alternate_backend_service. 940 'matchRules': [{ 941 'prefixMatch': '/grpc.testing.TestService/Unary' 942 }], 943 'service': alternate_backend_service.url 944 }], 945 { 946 "UnaryCall": alternate_backend_instances, 947 "EmptyCall": original_backend_instances 948 }), 949 ( 950 # This test case is similar to the one above (but with route 951 # services swapped). This test has two routes (full_path and 952 # the default) to match EmptyCall, and both routes set 953 # alternative_backend_service as the action. This forces the 954 # client to handle duplicate Clusters in the RDS response. 955 [ 956 { 957 'priority': 0, 958 # Prefix UnaryCall -> original_backend_service. 959 'matchRules': [{ 960 'prefixMatch': '/grpc.testing.TestService/Unary' 961 }], 962 'service': original_backend_service.url 963 }, 964 { 965 'priority': 1, 966 # FullPath EmptyCall -> alternate_backend_service. 967 'matchRules': [{ 968 'fullPathMatch': 969 '/grpc.testing.TestService/EmptyCall' 970 }], 971 'service': alternate_backend_service.url 972 } 973 ], 974 { 975 "UnaryCall": original_backend_instances, 976 "EmptyCall": alternate_backend_instances 977 }) 978 ] 979 980 for (route_rules, expected_instances) in test_cases: 981 logger.info('patching url map with %s', route_rules) 982 patch_url_map_backend_service(gcp, 983 original_backend_service, 984 route_rules=route_rules) 985 986 # Wait for traffic to go to both services. 987 logger.info( 988 'waiting for traffic to go to all backends (including alternate)' 989 ) 990 wait_until_all_rpcs_go_to_given_backends( 991 original_backend_instances + alternate_backend_instances, 992 _WAIT_FOR_STATS_SEC) 993 994 retry_count = 20 995 # Each attempt takes about 10 seconds, 20 retries is equivalent to 200 996 # seconds timeout. 997 for i in range(retry_count): 998 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 999 if not stats.rpcs_by_method: 1000 raise ValueError( 1001 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1002 ) 1003 logger.info('attempt %d', i) 1004 if compare_expected_instances(stats, expected_instances): 1005 logger.info("success") 1006 break 1007 finally: 1008 patch_url_map_backend_service(gcp, original_backend_service) 1009 patch_backend_service(gcp, alternate_backend_service, []) 1010 1011 1012def test_header_matching(gcp, original_backend_service, instance_group, 1013 alternate_backend_service, same_zone_instance_group): 1014 # This test start with all traffic (UnaryCall and EmptyCall) going to 1015 # original_backend_service. 1016 # 1017 # Then it updates URL-map to add routes, to make RPCs with test headers to 1018 # go to different backends. It waits for all backends in both services to 1019 # receive traffic, then verifies that traffic goes to the expected 1020 # backends. 1021 logger.info('Running test_header_matching') 1022 1023 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1024 gcp, original_backend_service, instance_group, 1025 alternate_backend_service, same_zone_instance_group) 1026 1027 try: 1028 # A list of tuples (route_rules, expected_instances). 1029 test_cases = [ 1030 ( 1031 [{ 1032 'priority': 0, 1033 # Header ExactMatch -> alternate_backend_service. 1034 # EmptyCall is sent with the metadata. 1035 'matchRules': [{ 1036 'prefixMatch': 1037 '/', 1038 'headerMatches': [{ 1039 'headerName': _TEST_METADATA_KEY, 1040 'exactMatch': _TEST_METADATA_VALUE_EMPTY 1041 }] 1042 }], 1043 'service': alternate_backend_service.url 1044 }], 1045 { 1046 "EmptyCall": alternate_backend_instances, 1047 "UnaryCall": original_backend_instances 1048 }), 1049 ( 1050 [{ 1051 'priority': 0, 1052 # Header PrefixMatch -> alternate_backend_service. 1053 # UnaryCall is sent with the metadata. 1054 'matchRules': [{ 1055 'prefixMatch': 1056 '/', 1057 'headerMatches': [{ 1058 'headerName': _TEST_METADATA_KEY, 1059 'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2] 1060 }] 1061 }], 1062 'service': alternate_backend_service.url 1063 }], 1064 { 1065 "EmptyCall": original_backend_instances, 1066 "UnaryCall": alternate_backend_instances 1067 }), 1068 ( 1069 [{ 1070 'priority': 0, 1071 # Header SuffixMatch -> alternate_backend_service. 1072 # EmptyCall is sent with the metadata. 1073 'matchRules': [{ 1074 'prefixMatch': 1075 '/', 1076 'headerMatches': [{ 1077 'headerName': _TEST_METADATA_KEY, 1078 'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:] 1079 }] 1080 }], 1081 'service': alternate_backend_service.url 1082 }], 1083 { 1084 "EmptyCall": alternate_backend_instances, 1085 "UnaryCall": original_backend_instances 1086 }), 1087 ( 1088 [{ 1089 'priority': 0, 1090 # Header invert ExactMatch -> alternate_backend_service. 1091 # EmptyCall is sent with the metadata, so will be sent to original. 1092 'matchRules': [{ 1093 'prefixMatch': 1094 '/', 1095 'headerMatches': [{ 1096 'headerName': _TEST_METADATA_KEY, 1097 'exactMatch': _TEST_METADATA_VALUE_EMPTY, 1098 'invertMatch': True 1099 }] 1100 }], 1101 'service': alternate_backend_service.url 1102 }], 1103 { 1104 "EmptyCall": original_backend_instances, 1105 "UnaryCall": alternate_backend_instances 1106 }), 1107 ] 1108 1109 for (route_rules, expected_instances) in test_cases: 1110 logger.info('patching url map with %s -> alternative', 1111 route_rules[0]['matchRules']) 1112 patch_url_map_backend_service(gcp, 1113 original_backend_service, 1114 route_rules=route_rules) 1115 1116 # Wait for traffic to go to both services. 1117 logger.info( 1118 'waiting for traffic to go to all backends (including alternate)' 1119 ) 1120 wait_until_all_rpcs_go_to_given_backends( 1121 original_backend_instances + alternate_backend_instances, 1122 _WAIT_FOR_STATS_SEC) 1123 1124 retry_count = 20 1125 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 1126 # seconds timeout. 1127 for i in range(retry_count): 1128 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1129 if not stats.rpcs_by_method: 1130 raise ValueError( 1131 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1132 ) 1133 logger.info('attempt %d', i) 1134 if compare_expected_instances(stats, expected_instances): 1135 logger.info("success") 1136 break 1137 finally: 1138 patch_url_map_backend_service(gcp, original_backend_service) 1139 patch_backend_service(gcp, alternate_backend_service, []) 1140 1141 1142def test_circuit_breaking(gcp, original_backend_service, instance_group, 1143 same_zone_instance_group): 1144 ''' 1145 Since backend service circuit_breakers configuration cannot be unset, 1146 which causes trouble for restoring validate_for_proxy flag in target 1147 proxy/global forwarding rule. This test uses dedicated backend sevices. 1148 The url_map and backend services undergoes the following state changes: 1149 1150 Before test: 1151 original_backend_service -> [instance_group] 1152 extra_backend_service -> [] 1153 more_extra_backend_service -> [] 1154 1155 url_map -> [original_backend_service] 1156 1157 In test: 1158 extra_backend_service (with circuit_breakers) -> [instance_group] 1159 more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group] 1160 1161 url_map -> [extra_backend_service, more_extra_backend_service] 1162 1163 After test: 1164 original_backend_service -> [instance_group] 1165 extra_backend_service (with circuit_breakers) -> [] 1166 more_extra_backend_service (with circuit_breakers) -> [] 1167 1168 url_map -> [original_backend_service] 1169 ''' 1170 logger.info('Running test_circuit_breaking') 1171 additional_backend_services = [] 1172 try: 1173 # TODO(chengyuanzhang): Dedicated backend services created for circuit 1174 # breaking test. Once the issue for unsetting backend service circuit 1175 # breakers is resolved or configuring backend service circuit breakers is 1176 # enabled for config validation, these dedicated backend services can be 1177 # eliminated. 1178 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix 1179 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix 1180 extra_backend_service = add_backend_service(gcp, 1181 extra_backend_service_name) 1182 additional_backend_services.append(extra_backend_service) 1183 more_extra_backend_service = add_backend_service( 1184 gcp, more_extra_backend_service_name) 1185 additional_backend_services.append(more_extra_backend_service) 1186 # The config validation for proxyless doesn't allow setting 1187 # circuit_breakers. Disable validate validate_for_proxyless 1188 # for this test. This can be removed when validation 1189 # accepts circuit_breakers. 1190 logger.info('disabling validate_for_proxyless in target proxy') 1191 set_validate_for_proxyless(gcp, False) 1192 extra_backend_service_max_requests = 500 1193 more_extra_backend_service_max_requests = 1000 1194 patch_backend_service(gcp, 1195 extra_backend_service, [instance_group], 1196 circuit_breakers={ 1197 'maxRequests': 1198 extra_backend_service_max_requests 1199 }) 1200 logger.info('Waiting for extra backends to become healthy') 1201 wait_for_healthy_backends(gcp, extra_backend_service, instance_group) 1202 patch_backend_service(gcp, 1203 more_extra_backend_service, 1204 [same_zone_instance_group], 1205 circuit_breakers={ 1206 'maxRequests': 1207 more_extra_backend_service_max_requests 1208 }) 1209 logger.info('Waiting for more extra backend to become healthy') 1210 wait_for_healthy_backends(gcp, more_extra_backend_service, 1211 same_zone_instance_group) 1212 extra_backend_instances = get_instance_names(gcp, instance_group) 1213 more_extra_backend_instances = get_instance_names( 1214 gcp, same_zone_instance_group) 1215 route_rules = [ 1216 { 1217 'priority': 0, 1218 # UnaryCall -> extra_backend_service 1219 'matchRules': [{ 1220 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1221 }], 1222 'service': extra_backend_service.url 1223 }, 1224 { 1225 'priority': 1, 1226 # EmptyCall -> more_extra_backend_service 1227 'matchRules': [{ 1228 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1229 }], 1230 'service': more_extra_backend_service.url 1231 }, 1232 ] 1233 1234 # Make client send UNARY_CALL and EMPTY_CALL. 1235 configure_client([ 1236 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1237 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1238 ], []) 1239 logger.info('Patching url map with %s', route_rules) 1240 patch_url_map_backend_service(gcp, 1241 extra_backend_service, 1242 route_rules=route_rules) 1243 logger.info('Waiting for traffic to go to all backends') 1244 wait_until_all_rpcs_go_to_given_backends( 1245 extra_backend_instances + more_extra_backend_instances, 1246 _WAIT_FOR_STATS_SEC) 1247 1248 # Make all calls keep-open. 1249 configure_client([ 1250 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1251 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1252 ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1253 'rpc-behavior', 'keep-open'), 1254 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1255 'rpc-behavior', 'keep-open')]) 1256 wait_until_rpcs_in_flight( 1257 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1258 int(extra_backend_service_max_requests / args.qps)), 1259 extra_backend_service_max_requests, 1) 1260 logger.info('UNARY_CALL reached stable state (%d)', 1261 extra_backend_service_max_requests) 1262 wait_until_rpcs_in_flight( 1263 'EMPTY_CALL', 1264 (_WAIT_FOR_BACKEND_SEC + 1265 int(more_extra_backend_service_max_requests / args.qps)), 1266 more_extra_backend_service_max_requests, 1) 1267 logger.info('EMPTY_CALL reached stable state (%d)', 1268 more_extra_backend_service_max_requests) 1269 1270 # Increment circuit breakers max_requests threshold. 1271 extra_backend_service_max_requests = 800 1272 patch_backend_service(gcp, 1273 extra_backend_service, [instance_group], 1274 circuit_breakers={ 1275 'maxRequests': 1276 extra_backend_service_max_requests 1277 }) 1278 wait_until_rpcs_in_flight( 1279 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1280 int(extra_backend_service_max_requests / args.qps)), 1281 extra_backend_service_max_requests, 1) 1282 logger.info('UNARY_CALL reached stable state after increase (%d)', 1283 extra_backend_service_max_requests) 1284 logger.info('success') 1285 # Avoid new RPCs being outstanding (some test clients create threads 1286 # for sending RPCs) after restoring backend services. 1287 configure_client( 1288 [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], []) 1289 finally: 1290 patch_url_map_backend_service(gcp, original_backend_service) 1291 patch_backend_service(gcp, original_backend_service, [instance_group]) 1292 for backend_service in additional_backend_services: 1293 delete_backend_service(gcp, backend_service) 1294 set_validate_for_proxyless(gcp, True) 1295 1296 1297def set_validate_for_proxyless(gcp, validate_for_proxyless): 1298 if not gcp.alpha_compute: 1299 logger.debug( 1300 'Not setting validateForProxy because alpha is not enabled') 1301 return 1302 # This function deletes global_forwarding_rule and target_proxy, then 1303 # recreate target_proxy with validateForProxyless=False. This is necessary 1304 # because patching target_grpc_proxy isn't supported. 1305 delete_global_forwarding_rule(gcp) 1306 delete_target_proxy(gcp) 1307 create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless) 1308 create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name, 1309 [gcp.service_port]) 1310 1311 1312def get_serving_status(instance, service_port): 1313 with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: 1314 health_stub = health_pb2_grpc.HealthStub(channel) 1315 return health_stub.Check(health_pb2.HealthCheckRequest()) 1316 1317 1318def set_serving_status(instances, service_port, serving): 1319 logger.info('setting %s serving status to %s', instances, serving) 1320 for instance in instances: 1321 with grpc.insecure_channel('%s:%d' % 1322 (instance, service_port)) as channel: 1323 logger.info('setting %s serving status to %s', instance, serving) 1324 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) 1325 retry_count = 5 1326 for i in range(5): 1327 if serving: 1328 stub.SetServing(empty_pb2.Empty()) 1329 else: 1330 stub.SetNotServing(empty_pb2.Empty()) 1331 serving_status = get_serving_status(instance, service_port) 1332 logger.info('got instance service status %s', serving_status) 1333 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING 1334 if serving_status.status == want_status: 1335 break 1336 if i == retry_count - 1: 1337 raise Exception( 1338 'failed to set instance service status after %d retries' 1339 % retry_count) 1340 1341 1342def is_primary_instance_group(gcp, instance_group): 1343 # Clients may connect to a TD instance in a different region than the 1344 # client, in which case primary/secondary assignments may not be based on 1345 # the client's actual locality. 1346 instance_names = get_instance_names(gcp, instance_group) 1347 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1348 return all(peer in instance_names for peer in stats.rpcs_by_peer.keys()) 1349 1350 1351def get_startup_script(path_to_server_binary, service_port): 1352 if path_to_server_binary: 1353 return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary, 1354 service_port) 1355 else: 1356 return """#!/bin/bash 1357sudo apt update 1358sudo apt install -y git default-jdk 1359mkdir java_server 1360pushd java_server 1361git clone https://github.com/grpc/grpc-java.git 1362pushd grpc-java 1363pushd interop-testing 1364../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true 1365 1366nohup build/install/grpc-interop-testing/bin/xds-test-server \ 1367 --port=%d 1>/dev/null &""" % service_port 1368 1369 1370def create_instance_template(gcp, name, network, source_image, machine_type, 1371 startup_script): 1372 config = { 1373 'name': name, 1374 'properties': { 1375 'tags': { 1376 'items': ['allow-health-checks'] 1377 }, 1378 'machineType': machine_type, 1379 'serviceAccounts': [{ 1380 'email': 'default', 1381 'scopes': ['https://www.googleapis.com/auth/cloud-platform',] 1382 }], 1383 'networkInterfaces': [{ 1384 'accessConfigs': [{ 1385 'type': 'ONE_TO_ONE_NAT' 1386 }], 1387 'network': network 1388 }], 1389 'disks': [{ 1390 'boot': True, 1391 'initializeParams': { 1392 'sourceImage': source_image 1393 } 1394 }], 1395 'metadata': { 1396 'items': [{ 1397 'key': 'startup-script', 1398 'value': startup_script 1399 }] 1400 } 1401 } 1402 } 1403 1404 logger.debug('Sending GCP request with body=%s', config) 1405 result = gcp.compute.instanceTemplates().insert( 1406 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1407 wait_for_global_operation(gcp, result['name']) 1408 gcp.instance_template = GcpResource(config['name'], result['targetLink']) 1409 1410 1411def add_instance_group(gcp, zone, name, size): 1412 config = { 1413 'name': name, 1414 'instanceTemplate': gcp.instance_template.url, 1415 'targetSize': size, 1416 'namedPorts': [{ 1417 'name': 'grpc', 1418 'port': gcp.service_port 1419 }] 1420 } 1421 1422 logger.debug('Sending GCP request with body=%s', config) 1423 result = gcp.compute.instanceGroupManagers().insert( 1424 project=gcp.project, zone=zone, 1425 body=config).execute(num_retries=_GCP_API_RETRIES) 1426 wait_for_zone_operation(gcp, zone, result['name']) 1427 result = gcp.compute.instanceGroupManagers().get( 1428 project=gcp.project, zone=zone, 1429 instanceGroupManager=config['name']).execute( 1430 num_retries=_GCP_API_RETRIES) 1431 instance_group = InstanceGroup(config['name'], result['instanceGroup'], 1432 zone) 1433 gcp.instance_groups.append(instance_group) 1434 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size, 1435 _WAIT_FOR_OPERATION_SEC) 1436 return instance_group 1437 1438 1439def create_health_check(gcp, name): 1440 if gcp.alpha_compute: 1441 config = { 1442 'name': name, 1443 'type': 'GRPC', 1444 'grpcHealthCheck': { 1445 'portSpecification': 'USE_SERVING_PORT' 1446 } 1447 } 1448 compute_to_use = gcp.alpha_compute 1449 else: 1450 config = { 1451 'name': name, 1452 'type': 'TCP', 1453 'tcpHealthCheck': { 1454 'portName': 'grpc' 1455 } 1456 } 1457 compute_to_use = gcp.compute 1458 logger.debug('Sending GCP request with body=%s', config) 1459 result = compute_to_use.healthChecks().insert( 1460 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1461 wait_for_global_operation(gcp, result['name']) 1462 gcp.health_check = GcpResource(config['name'], result['targetLink']) 1463 1464 1465def create_health_check_firewall_rule(gcp, name): 1466 config = { 1467 'name': name, 1468 'direction': 'INGRESS', 1469 'allowed': [{ 1470 'IPProtocol': 'tcp' 1471 }], 1472 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], 1473 'targetTags': ['allow-health-checks'], 1474 } 1475 logger.debug('Sending GCP request with body=%s', config) 1476 result = gcp.compute.firewalls().insert( 1477 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1478 wait_for_global_operation(gcp, result['name']) 1479 gcp.health_check_firewall_rule = GcpResource(config['name'], 1480 result['targetLink']) 1481 1482 1483def add_backend_service(gcp, name): 1484 if gcp.alpha_compute: 1485 protocol = 'GRPC' 1486 compute_to_use = gcp.alpha_compute 1487 else: 1488 protocol = 'HTTP2' 1489 compute_to_use = gcp.compute 1490 config = { 1491 'name': name, 1492 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 1493 'healthChecks': [gcp.health_check.url], 1494 'portName': 'grpc', 1495 'protocol': protocol 1496 } 1497 logger.debug('Sending GCP request with body=%s', config) 1498 result = compute_to_use.backendServices().insert( 1499 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1500 wait_for_global_operation(gcp, result['name']) 1501 backend_service = GcpResource(config['name'], result['targetLink']) 1502 gcp.backend_services.append(backend_service) 1503 return backend_service 1504 1505 1506def create_url_map(gcp, name, backend_service, host_name): 1507 config = { 1508 'name': name, 1509 'defaultService': backend_service.url, 1510 'pathMatchers': [{ 1511 'name': _PATH_MATCHER_NAME, 1512 'defaultService': backend_service.url, 1513 }], 1514 'hostRules': [{ 1515 'hosts': [host_name], 1516 'pathMatcher': _PATH_MATCHER_NAME 1517 }] 1518 } 1519 logger.debug('Sending GCP request with body=%s', config) 1520 result = gcp.compute.urlMaps().insert( 1521 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1522 wait_for_global_operation(gcp, result['name']) 1523 gcp.url_map = GcpResource(config['name'], result['targetLink']) 1524 1525 1526def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): 1527 config = { 1528 'hostRules': [{ 1529 'hosts': ['%s:%d' % (host_name, gcp.service_port)], 1530 'pathMatcher': _PATH_MATCHER_NAME 1531 }] 1532 } 1533 logger.debug('Sending GCP request with body=%s', config) 1534 result = gcp.compute.urlMaps().patch( 1535 project=gcp.project, urlMap=name, 1536 body=config).execute(num_retries=_GCP_API_RETRIES) 1537 wait_for_global_operation(gcp, result['name']) 1538 1539 1540def create_target_proxy(gcp, name, validate_for_proxyless=True): 1541 if gcp.alpha_compute: 1542 config = { 1543 'name': name, 1544 'url_map': gcp.url_map.url, 1545 'validate_for_proxyless': validate_for_proxyless 1546 } 1547 logger.debug('Sending GCP request with body=%s', config) 1548 result = gcp.alpha_compute.targetGrpcProxies().insert( 1549 project=gcp.project, 1550 body=config).execute(num_retries=_GCP_API_RETRIES) 1551 else: 1552 config = { 1553 'name': name, 1554 'url_map': gcp.url_map.url, 1555 } 1556 logger.debug('Sending GCP request with body=%s', config) 1557 result = gcp.compute.targetHttpProxies().insert( 1558 project=gcp.project, 1559 body=config).execute(num_retries=_GCP_API_RETRIES) 1560 wait_for_global_operation(gcp, result['name']) 1561 gcp.target_proxy = GcpResource(config['name'], result['targetLink']) 1562 1563 1564def create_global_forwarding_rule(gcp, name, potential_ports): 1565 if gcp.alpha_compute: 1566 compute_to_use = gcp.alpha_compute 1567 else: 1568 compute_to_use = gcp.compute 1569 for port in potential_ports: 1570 try: 1571 config = { 1572 'name': name, 1573 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 1574 'portRange': str(port), 1575 'IPAddress': '0.0.0.0', 1576 'network': args.network, 1577 'target': gcp.target_proxy.url, 1578 } 1579 logger.debug('Sending GCP request with body=%s', config) 1580 result = compute_to_use.globalForwardingRules().insert( 1581 project=gcp.project, 1582 body=config).execute(num_retries=_GCP_API_RETRIES) 1583 wait_for_global_operation(gcp, result['name']) 1584 gcp.global_forwarding_rule = GcpResource(config['name'], 1585 result['targetLink']) 1586 gcp.service_port = port 1587 return 1588 except googleapiclient.errors.HttpError as http_error: 1589 logger.warning( 1590 'Got error %s when attempting to create forwarding rule to ' 1591 '0.0.0.0:%d. Retrying with another port.' % (http_error, port)) 1592 1593 1594def get_health_check(gcp, health_check_name): 1595 result = gcp.compute.healthChecks().get( 1596 project=gcp.project, healthCheck=health_check_name).execute() 1597 gcp.health_check = GcpResource(health_check_name, result['selfLink']) 1598 1599 1600def get_health_check_firewall_rule(gcp, firewall_name): 1601 result = gcp.compute.firewalls().get(project=gcp.project, 1602 firewall=firewall_name).execute() 1603 gcp.health_check_firewall_rule = GcpResource(firewall_name, 1604 result['selfLink']) 1605 1606 1607def get_backend_service(gcp, backend_service_name): 1608 result = gcp.compute.backendServices().get( 1609 project=gcp.project, backendService=backend_service_name).execute() 1610 backend_service = GcpResource(backend_service_name, result['selfLink']) 1611 gcp.backend_services.append(backend_service) 1612 return backend_service 1613 1614 1615def get_url_map(gcp, url_map_name): 1616 result = gcp.compute.urlMaps().get(project=gcp.project, 1617 urlMap=url_map_name).execute() 1618 gcp.url_map = GcpResource(url_map_name, result['selfLink']) 1619 1620 1621def get_target_proxy(gcp, target_proxy_name): 1622 if gcp.alpha_compute: 1623 result = gcp.alpha_compute.targetGrpcProxies().get( 1624 project=gcp.project, targetGrpcProxy=target_proxy_name).execute() 1625 else: 1626 result = gcp.compute.targetHttpProxies().get( 1627 project=gcp.project, targetHttpProxy=target_proxy_name).execute() 1628 gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink']) 1629 1630 1631def get_global_forwarding_rule(gcp, forwarding_rule_name): 1632 result = gcp.compute.globalForwardingRules().get( 1633 project=gcp.project, forwardingRule=forwarding_rule_name).execute() 1634 gcp.global_forwarding_rule = GcpResource(forwarding_rule_name, 1635 result['selfLink']) 1636 1637 1638def get_instance_template(gcp, template_name): 1639 result = gcp.compute.instanceTemplates().get( 1640 project=gcp.project, instanceTemplate=template_name).execute() 1641 gcp.instance_template = GcpResource(template_name, result['selfLink']) 1642 1643 1644def get_instance_group(gcp, zone, instance_group_name): 1645 result = gcp.compute.instanceGroups().get( 1646 project=gcp.project, zone=zone, 1647 instanceGroup=instance_group_name).execute() 1648 gcp.service_port = result['namedPorts'][0]['port'] 1649 instance_group = InstanceGroup(instance_group_name, result['selfLink'], 1650 zone) 1651 gcp.instance_groups.append(instance_group) 1652 return instance_group 1653 1654 1655def delete_global_forwarding_rule(gcp): 1656 try: 1657 result = gcp.compute.globalForwardingRules().delete( 1658 project=gcp.project, 1659 forwardingRule=gcp.global_forwarding_rule.name).execute( 1660 num_retries=_GCP_API_RETRIES) 1661 wait_for_global_operation(gcp, result['name']) 1662 except googleapiclient.errors.HttpError as http_error: 1663 logger.info('Delete failed: %s', http_error) 1664 1665 1666def delete_target_proxy(gcp): 1667 try: 1668 if gcp.alpha_compute: 1669 result = gcp.alpha_compute.targetGrpcProxies().delete( 1670 project=gcp.project, 1671 targetGrpcProxy=gcp.target_proxy.name).execute( 1672 num_retries=_GCP_API_RETRIES) 1673 else: 1674 result = gcp.compute.targetHttpProxies().delete( 1675 project=gcp.project, 1676 targetHttpProxy=gcp.target_proxy.name).execute( 1677 num_retries=_GCP_API_RETRIES) 1678 wait_for_global_operation(gcp, result['name']) 1679 except googleapiclient.errors.HttpError as http_error: 1680 logger.info('Delete failed: %s', http_error) 1681 1682 1683def delete_url_map(gcp): 1684 try: 1685 result = gcp.compute.urlMaps().delete( 1686 project=gcp.project, 1687 urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES) 1688 wait_for_global_operation(gcp, result['name']) 1689 except googleapiclient.errors.HttpError as http_error: 1690 logger.info('Delete failed: %s', http_error) 1691 1692 1693def delete_backend_service(gcp, backend_service): 1694 try: 1695 result = gcp.compute.backendServices().delete( 1696 project=gcp.project, backendService=backend_service.name).execute( 1697 num_retries=_GCP_API_RETRIES) 1698 wait_for_global_operation(gcp, result['name']) 1699 except googleapiclient.errors.HttpError as http_error: 1700 logger.info('Delete failed: %s', http_error) 1701 1702 1703def delete_backend_services(gcp): 1704 for backend_service in gcp.backend_services: 1705 delete_backend_service(gcp, backend_service) 1706 1707 1708def delete_firewall(gcp): 1709 try: 1710 result = gcp.compute.firewalls().delete( 1711 project=gcp.project, 1712 firewall=gcp.health_check_firewall_rule.name).execute( 1713 num_retries=_GCP_API_RETRIES) 1714 wait_for_global_operation(gcp, result['name']) 1715 except googleapiclient.errors.HttpError as http_error: 1716 logger.info('Delete failed: %s', http_error) 1717 1718 1719def delete_health_check(gcp): 1720 try: 1721 result = gcp.compute.healthChecks().delete( 1722 project=gcp.project, healthCheck=gcp.health_check.name).execute( 1723 num_retries=_GCP_API_RETRIES) 1724 wait_for_global_operation(gcp, result['name']) 1725 except googleapiclient.errors.HttpError as http_error: 1726 logger.info('Delete failed: %s', http_error) 1727 1728 1729def delete_instance_groups(gcp): 1730 for instance_group in gcp.instance_groups: 1731 try: 1732 result = gcp.compute.instanceGroupManagers().delete( 1733 project=gcp.project, 1734 zone=instance_group.zone, 1735 instanceGroupManager=instance_group.name).execute( 1736 num_retries=_GCP_API_RETRIES) 1737 wait_for_zone_operation(gcp, 1738 instance_group.zone, 1739 result['name'], 1740 timeout_sec=_WAIT_FOR_BACKEND_SEC) 1741 except googleapiclient.errors.HttpError as http_error: 1742 logger.info('Delete failed: %s', http_error) 1743 1744 1745def delete_instance_template(gcp): 1746 try: 1747 result = gcp.compute.instanceTemplates().delete( 1748 project=gcp.project, 1749 instanceTemplate=gcp.instance_template.name).execute( 1750 num_retries=_GCP_API_RETRIES) 1751 wait_for_global_operation(gcp, result['name']) 1752 except googleapiclient.errors.HttpError as http_error: 1753 logger.info('Delete failed: %s', http_error) 1754 1755 1756def patch_backend_service(gcp, 1757 backend_service, 1758 instance_groups, 1759 balancing_mode='UTILIZATION', 1760 circuit_breakers=None): 1761 if gcp.alpha_compute: 1762 compute_to_use = gcp.alpha_compute 1763 else: 1764 compute_to_use = gcp.compute 1765 config = { 1766 'backends': [{ 1767 'group': instance_group.url, 1768 'balancingMode': balancing_mode, 1769 'maxRate': 1 if balancing_mode == 'RATE' else None 1770 } for instance_group in instance_groups], 1771 'circuitBreakers': circuit_breakers, 1772 } 1773 logger.debug('Sending GCP request with body=%s', config) 1774 result = compute_to_use.backendServices().patch( 1775 project=gcp.project, backendService=backend_service.name, 1776 body=config).execute(num_retries=_GCP_API_RETRIES) 1777 wait_for_global_operation(gcp, 1778 result['name'], 1779 timeout_sec=_WAIT_FOR_BACKEND_SEC) 1780 1781 1782def resize_instance_group(gcp, 1783 instance_group, 1784 new_size, 1785 timeout_sec=_WAIT_FOR_OPERATION_SEC): 1786 result = gcp.compute.instanceGroupManagers().resize( 1787 project=gcp.project, 1788 zone=instance_group.zone, 1789 instanceGroupManager=instance_group.name, 1790 size=new_size).execute(num_retries=_GCP_API_RETRIES) 1791 wait_for_zone_operation(gcp, 1792 instance_group.zone, 1793 result['name'], 1794 timeout_sec=360) 1795 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 1796 new_size, timeout_sec) 1797 1798 1799def patch_url_map_backend_service(gcp, 1800 backend_service=None, 1801 services_with_weights=None, 1802 route_rules=None): 1803 '''change url_map's backend service 1804 1805 Only one of backend_service and service_with_weights can be not None. 1806 ''' 1807 if backend_service and services_with_weights: 1808 raise ValueError( 1809 'both backend_service and service_with_weights are not None.') 1810 1811 default_service = backend_service.url if backend_service else None 1812 default_route_action = { 1813 'weightedBackendServices': [{ 1814 'backendService': service.url, 1815 'weight': w, 1816 } for service, w in services_with_weights.items()] 1817 } if services_with_weights else None 1818 1819 config = { 1820 'pathMatchers': [{ 1821 'name': _PATH_MATCHER_NAME, 1822 'defaultService': default_service, 1823 'defaultRouteAction': default_route_action, 1824 'routeRules': route_rules, 1825 }] 1826 } 1827 logger.debug('Sending GCP request with body=%s', config) 1828 result = gcp.compute.urlMaps().patch( 1829 project=gcp.project, urlMap=gcp.url_map.name, 1830 body=config).execute(num_retries=_GCP_API_RETRIES) 1831 wait_for_global_operation(gcp, result['name']) 1832 1833 1834def wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 1835 expected_size, timeout_sec): 1836 start_time = time.time() 1837 while True: 1838 current_size = len(get_instance_names(gcp, instance_group)) 1839 if current_size == expected_size: 1840 break 1841 if time.time() - start_time > timeout_sec: 1842 raise Exception( 1843 'Instance group had expected size %d but actual size %d' % 1844 (expected_size, current_size)) 1845 time.sleep(2) 1846 1847 1848def wait_for_global_operation(gcp, 1849 operation, 1850 timeout_sec=_WAIT_FOR_OPERATION_SEC): 1851 start_time = time.time() 1852 while time.time() - start_time <= timeout_sec: 1853 result = gcp.compute.globalOperations().get( 1854 project=gcp.project, 1855 operation=operation).execute(num_retries=_GCP_API_RETRIES) 1856 if result['status'] == 'DONE': 1857 if 'error' in result: 1858 raise Exception(result['error']) 1859 return 1860 time.sleep(2) 1861 raise Exception('Operation %s did not complete within %d' % 1862 (operation, timeout_sec)) 1863 1864 1865def wait_for_zone_operation(gcp, 1866 zone, 1867 operation, 1868 timeout_sec=_WAIT_FOR_OPERATION_SEC): 1869 start_time = time.time() 1870 while time.time() - start_time <= timeout_sec: 1871 result = gcp.compute.zoneOperations().get( 1872 project=gcp.project, zone=zone, 1873 operation=operation).execute(num_retries=_GCP_API_RETRIES) 1874 if result['status'] == 'DONE': 1875 if 'error' in result: 1876 raise Exception(result['error']) 1877 return 1878 time.sleep(2) 1879 raise Exception('Operation %s did not complete within %d' % 1880 (operation, timeout_sec)) 1881 1882 1883def wait_for_healthy_backends(gcp, 1884 backend_service, 1885 instance_group, 1886 timeout_sec=_WAIT_FOR_BACKEND_SEC): 1887 start_time = time.time() 1888 config = {'group': instance_group.url} 1889 instance_names = get_instance_names(gcp, instance_group) 1890 expected_size = len(instance_names) 1891 while time.time() - start_time <= timeout_sec: 1892 for instance_name in instance_names: 1893 try: 1894 status = get_serving_status(instance_name, gcp.service_port) 1895 logger.info('serving status response from %s: %s', 1896 instance_name, status) 1897 except grpc.RpcError as rpc_error: 1898 logger.info('checking serving status of %s failed: %s', 1899 instance_name, rpc_error) 1900 result = gcp.compute.backendServices().getHealth( 1901 project=gcp.project, 1902 backendService=backend_service.name, 1903 body=config).execute(num_retries=_GCP_API_RETRIES) 1904 if 'healthStatus' in result: 1905 logger.info('received GCP healthStatus: %s', result['healthStatus']) 1906 healthy = True 1907 for instance in result['healthStatus']: 1908 if instance['healthState'] != 'HEALTHY': 1909 healthy = False 1910 break 1911 if healthy and expected_size == len(result['healthStatus']): 1912 return 1913 else: 1914 logger.info('no healthStatus received from GCP') 1915 time.sleep(5) 1916 raise Exception('Not all backends became healthy within %d seconds: %s' % 1917 (timeout_sec, result)) 1918 1919 1920def get_instance_names(gcp, instance_group): 1921 instance_names = [] 1922 result = gcp.compute.instanceGroups().listInstances( 1923 project=gcp.project, 1924 zone=instance_group.zone, 1925 instanceGroup=instance_group.name, 1926 body={ 1927 'instanceState': 'ALL' 1928 }).execute(num_retries=_GCP_API_RETRIES) 1929 if 'items' not in result: 1930 return [] 1931 for item in result['items']: 1932 # listInstances() returns the full URL of the instance, which ends with 1933 # the instance name. compute.instances().get() requires using the 1934 # instance name (not the full URL) to look up instance details, so we 1935 # just extract the name manually. 1936 instance_name = item['instance'].split('/')[-1] 1937 instance_names.append(instance_name) 1938 logger.info('retrieved instance names: %s', instance_names) 1939 return instance_names 1940 1941 1942def clean_up(gcp): 1943 if gcp.global_forwarding_rule: 1944 delete_global_forwarding_rule(gcp) 1945 if gcp.target_proxy: 1946 delete_target_proxy(gcp) 1947 if gcp.url_map: 1948 delete_url_map(gcp) 1949 delete_backend_services(gcp) 1950 if gcp.health_check_firewall_rule: 1951 delete_firewall(gcp) 1952 if gcp.health_check: 1953 delete_health_check(gcp) 1954 delete_instance_groups(gcp) 1955 if gcp.instance_template: 1956 delete_instance_template(gcp) 1957 1958 1959class InstanceGroup(object): 1960 1961 def __init__(self, name, url, zone): 1962 self.name = name 1963 self.url = url 1964 self.zone = zone 1965 1966 1967class GcpResource(object): 1968 1969 def __init__(self, name, url): 1970 self.name = name 1971 self.url = url 1972 1973 1974class GcpState(object): 1975 1976 def __init__(self, compute, alpha_compute, project): 1977 self.compute = compute 1978 self.alpha_compute = alpha_compute 1979 self.project = project 1980 self.health_check = None 1981 self.health_check_firewall_rule = None 1982 self.backend_services = [] 1983 self.url_map = None 1984 self.target_proxy = None 1985 self.global_forwarding_rule = None 1986 self.service_port = None 1987 self.instance_template = None 1988 self.instance_groups = [] 1989 1990 1991alpha_compute = None 1992if args.compute_discovery_document: 1993 with open(args.compute_discovery_document, 'r') as discovery_doc: 1994 compute = googleapiclient.discovery.build_from_document( 1995 discovery_doc.read()) 1996 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: 1997 with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: 1998 alpha_compute = googleapiclient.discovery.build_from_document( 1999 discovery_doc.read()) 2000else: 2001 compute = googleapiclient.discovery.build('compute', 'v1') 2002 if not args.only_stable_gcp_apis: 2003 alpha_compute = googleapiclient.discovery.build('compute', 'alpha') 2004 2005try: 2006 gcp = GcpState(compute, alpha_compute, args.project_id) 2007 gcp_suffix = args.gcp_suffix 2008 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 2009 if not args.use_existing_gcp_resources: 2010 if args.keep_gcp_resources: 2011 # Auto-generating a unique suffix in case of conflict should not be 2012 # combined with --keep_gcp_resources, as the suffix actually used 2013 # for GCP resources will not match the provided --gcp_suffix value. 2014 num_attempts = 1 2015 else: 2016 num_attempts = 5 2017 for i in range(num_attempts): 2018 try: 2019 logger.info('Using GCP suffix %s', gcp_suffix) 2020 create_health_check(gcp, health_check_name) 2021 break 2022 except googleapiclient.errors.HttpError as http_error: 2023 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999)) 2024 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 2025 logger.exception('HttpError when creating health check') 2026 if gcp.health_check is None: 2027 raise Exception('Failed to create health check name after %d ' 2028 'attempts' % num_attempts) 2029 firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix 2030 backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix 2031 alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix 2032 url_map_name = _BASE_URL_MAP_NAME + gcp_suffix 2033 service_host_name = _BASE_SERVICE_HOST + gcp_suffix 2034 target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix 2035 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix 2036 template_name = _BASE_TEMPLATE_NAME + gcp_suffix 2037 instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix 2038 same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix 2039 secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix 2040 if args.use_existing_gcp_resources: 2041 logger.info('Reusing existing GCP resources') 2042 get_health_check(gcp, health_check_name) 2043 try: 2044 get_health_check_firewall_rule(gcp, firewall_name) 2045 except googleapiclient.errors.HttpError as http_error: 2046 # Firewall rule may be auto-deleted periodically depending on GCP 2047 # project settings. 2048 logger.exception('Failed to find firewall rule, recreating') 2049 create_health_check_firewall_rule(gcp, firewall_name) 2050 backend_service = get_backend_service(gcp, backend_service_name) 2051 alternate_backend_service = get_backend_service( 2052 gcp, alternate_backend_service_name) 2053 get_url_map(gcp, url_map_name) 2054 get_target_proxy(gcp, target_proxy_name) 2055 get_global_forwarding_rule(gcp, forwarding_rule_name) 2056 get_instance_template(gcp, template_name) 2057 instance_group = get_instance_group(gcp, args.zone, instance_group_name) 2058 same_zone_instance_group = get_instance_group( 2059 gcp, args.zone, same_zone_instance_group_name) 2060 secondary_zone_instance_group = get_instance_group( 2061 gcp, args.secondary_zone, secondary_zone_instance_group_name) 2062 else: 2063 create_health_check_firewall_rule(gcp, firewall_name) 2064 backend_service = add_backend_service(gcp, backend_service_name) 2065 alternate_backend_service = add_backend_service( 2066 gcp, alternate_backend_service_name) 2067 create_url_map(gcp, url_map_name, backend_service, service_host_name) 2068 create_target_proxy(gcp, target_proxy_name) 2069 potential_service_ports = list(args.service_port_range) 2070 random.shuffle(potential_service_ports) 2071 create_global_forwarding_rule(gcp, forwarding_rule_name, 2072 potential_service_ports) 2073 if not gcp.service_port: 2074 raise Exception( 2075 'Failed to find a valid ip:port for the forwarding rule') 2076 if gcp.service_port != _DEFAULT_SERVICE_PORT: 2077 patch_url_map_host_rule_with_port(gcp, url_map_name, 2078 backend_service, 2079 service_host_name) 2080 startup_script = get_startup_script(args.path_to_server_binary, 2081 gcp.service_port) 2082 create_instance_template(gcp, template_name, args.network, 2083 args.source_image, args.machine_type, 2084 startup_script) 2085 instance_group = add_instance_group(gcp, args.zone, instance_group_name, 2086 _INSTANCE_GROUP_SIZE) 2087 patch_backend_service(gcp, backend_service, [instance_group]) 2088 same_zone_instance_group = add_instance_group( 2089 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) 2090 secondary_zone_instance_group = add_instance_group( 2091 gcp, args.secondary_zone, secondary_zone_instance_group_name, 2092 _INSTANCE_GROUP_SIZE) 2093 2094 wait_for_healthy_backends(gcp, backend_service, instance_group) 2095 2096 if args.test_case: 2097 client_env = dict(os.environ) 2098 bootstrap_server_features = [] 2099 if gcp.service_port == _DEFAULT_SERVICE_PORT: 2100 server_uri = service_host_name 2101 else: 2102 server_uri = service_host_name + ':' + str(gcp.service_port) 2103 if args.xds_v3_support: 2104 client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true' 2105 bootstrap_server_features.append('xds_v3') 2106 if args.bootstrap_file: 2107 bootstrap_path = os.path.abspath(args.bootstrap_file) 2108 else: 2109 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: 2110 bootstrap_file.write( 2111 _BOOTSTRAP_TEMPLATE.format( 2112 node_id=socket.gethostname(), 2113 server_features=json.dumps( 2114 bootstrap_server_features)).encode('utf-8')) 2115 bootstrap_path = bootstrap_file.name 2116 client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path 2117 client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true' 2118 test_results = {} 2119 failed_tests = [] 2120 for test_case in args.test_case: 2121 result = jobset.JobResult() 2122 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) 2123 if not os.path.exists(log_dir): 2124 os.makedirs(log_dir) 2125 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) 2126 test_log_file = open(test_log_filename, 'w+') 2127 client_process = None 2128 2129 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: 2130 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' 2131 else: 2132 rpcs_to_send = '--rpc="UnaryCall"' 2133 2134 if test_case in _TESTS_TO_SEND_METADATA: 2135 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU}"'.format( 2136 keyE=_TEST_METADATA_KEY, 2137 valueE=_TEST_METADATA_VALUE_EMPTY, 2138 keyU=_TEST_METADATA_KEY, 2139 valueU=_TEST_METADATA_VALUE_UNARY) 2140 else: 2141 # Setting the arg explicitly to empty with '--metadata=""' 2142 # makes C# client fail 2143 # (see https://github.com/commandlineparser/commandline/issues/412), 2144 # so instead we just rely on clients using the default when 2145 # metadata arg is not specified. 2146 metadata_to_send = '' 2147 2148 # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks 2149 # in the client. This means we will ignore intermittent RPC 2150 # failures (but this framework still checks that the final result 2151 # is as expected). 2152 # 2153 # Reason for disabling this is, the resources are shared by 2154 # multiple tests, and a change in previous test could be delayed 2155 # until the second test starts. The second test may see 2156 # intermittent failures because of that. 2157 # 2158 # A fix is to not share resources between tests (though that does 2159 # mean the tests will be significantly slower due to creating new 2160 # resources). 2161 fail_on_failed_rpc = '' 2162 2163 try: 2164 if not CLIENT_HOSTS: 2165 client_cmd_formatted = args.client_cmd.format( 2166 server_uri=server_uri, 2167 stats_port=args.stats_port, 2168 qps=args.qps, 2169 fail_on_failed_rpc=fail_on_failed_rpc, 2170 rpcs_to_send=rpcs_to_send, 2171 metadata_to_send=metadata_to_send) 2172 logger.debug('running client: %s', client_cmd_formatted) 2173 client_cmd = shlex.split(client_cmd_formatted) 2174 client_process = subprocess.Popen(client_cmd, 2175 env=client_env, 2176 stderr=subprocess.STDOUT, 2177 stdout=test_log_file) 2178 if test_case == 'backends_restart': 2179 test_backends_restart(gcp, backend_service, instance_group) 2180 elif test_case == 'change_backend_service': 2181 test_change_backend_service(gcp, backend_service, 2182 instance_group, 2183 alternate_backend_service, 2184 same_zone_instance_group) 2185 elif test_case == 'gentle_failover': 2186 test_gentle_failover(gcp, backend_service, instance_group, 2187 secondary_zone_instance_group) 2188 elif test_case == 'ping_pong': 2189 test_ping_pong(gcp, backend_service, instance_group) 2190 elif test_case == 'remove_instance_group': 2191 test_remove_instance_group(gcp, backend_service, 2192 instance_group, 2193 same_zone_instance_group) 2194 elif test_case == 'round_robin': 2195 test_round_robin(gcp, backend_service, instance_group) 2196 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': 2197 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 2198 gcp, backend_service, instance_group, 2199 secondary_zone_instance_group) 2200 elif test_case == 'secondary_locality_gets_requests_on_primary_failure': 2201 test_secondary_locality_gets_requests_on_primary_failure( 2202 gcp, backend_service, instance_group, 2203 secondary_zone_instance_group) 2204 elif test_case == 'traffic_splitting': 2205 test_traffic_splitting(gcp, backend_service, instance_group, 2206 alternate_backend_service, 2207 same_zone_instance_group) 2208 elif test_case == 'path_matching': 2209 test_path_matching(gcp, backend_service, instance_group, 2210 alternate_backend_service, 2211 same_zone_instance_group) 2212 elif test_case == 'header_matching': 2213 test_header_matching(gcp, backend_service, instance_group, 2214 alternate_backend_service, 2215 same_zone_instance_group) 2216 elif test_case == 'circuit_breaking': 2217 test_circuit_breaking(gcp, backend_service, instance_group, 2218 same_zone_instance_group) 2219 else: 2220 logger.error('Unknown test case: %s', test_case) 2221 sys.exit(1) 2222 if client_process and client_process.poll() is not None: 2223 raise Exception( 2224 'Client process exited prematurely with exit code %d' % 2225 client_process.returncode) 2226 result.state = 'PASSED' 2227 result.returncode = 0 2228 except Exception as e: 2229 logger.exception('Test case %s failed', test_case) 2230 failed_tests.append(test_case) 2231 result.state = 'FAILED' 2232 result.message = str(e) 2233 finally: 2234 if client_process: 2235 if client_process.returncode: 2236 logger.info('Client exited with code %d' % 2237 client_process.returncode) 2238 else: 2239 client_process.terminate() 2240 test_log_file.close() 2241 # Workaround for Python 3, as report_utils will invoke decode() on 2242 # result.message, which has a default value of ''. 2243 result.message = result.message.encode('UTF-8') 2244 test_results[test_case] = [result] 2245 if args.log_client_output: 2246 logger.info('Client output:') 2247 with open(test_log_filename, 'r') as client_output: 2248 logger.info(client_output.read()) 2249 if not os.path.exists(_TEST_LOG_BASE_DIR): 2250 os.makedirs(_TEST_LOG_BASE_DIR) 2251 report_utils.render_junit_xml_report(test_results, 2252 os.path.join( 2253 _TEST_LOG_BASE_DIR, 2254 _SPONGE_XML_NAME), 2255 suite_name='xds_tests', 2256 multi_target=True) 2257 if failed_tests: 2258 logger.error('Test case(s) %s failed', failed_tests) 2259 sys.exit(1) 2260finally: 2261 if not args.keep_gcp_resources: 2262 logger.info('Cleaning up GCP resources. This may take some time.') 2263 clean_up(gcp) 2264