1#!/usr/bin/env python
2# Copyright 2016 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run performance tests locally or remotely."""
16
17from __future__ import print_function
18
19import argparse
20import collections
21import itertools
22import json
23import multiprocessing
24import os
25import pipes
26import re
27import subprocess
28import sys
29import tempfile
30import time
31import traceback
32import uuid
33import six
34
35import performance.scenario_config as scenario_config
36import python_utils.jobset as jobset
37import python_utils.report_utils as report_utils
38
39_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
40os.chdir(_ROOT)
41
42_REMOTE_HOST_USERNAME = 'jenkins'
43
44
45class QpsWorkerJob:
46    """Encapsulates a qps worker server job."""
47
48    def __init__(self, spec, language, host_and_port, perf_file_base_name=None):
49        self._spec = spec
50        self.language = language
51        self.host_and_port = host_and_port
52        self._job = None
53        self.perf_file_base_name = perf_file_base_name
54
55    def start(self):
56        self._job = jobset.Job(
57            self._spec, newline_on_success=True, travis=True, add_env={})
58
59    def is_running(self):
60        """Polls a job and returns True if given job is still running."""
61        return self._job and self._job.state() == jobset._RUNNING
62
63    def kill(self):
64        if self._job:
65            self._job.kill()
66            self._job = None
67
68
69def create_qpsworker_job(language,
70                         shortname=None,
71                         port=10000,
72                         remote_host=None,
73                         perf_cmd=None):
74    cmdline = (language.worker_cmdline() + ['--driver_port=%s' % port])
75
76    if remote_host:
77        host_and_port = '%s:%s' % (remote_host, port)
78    else:
79        host_and_port = 'localhost:%s' % port
80
81    perf_file_base_name = None
82    if perf_cmd:
83        perf_file_base_name = '%s-%s' % (host_and_port, shortname)
84        # specify -o output file so perf.data gets collected when worker stopped
85        cmdline = perf_cmd + ['-o', '%s-perf.data' % perf_file_base_name
86                             ] + cmdline
87
88    worker_timeout = 3 * 60
89    if remote_host:
90        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
91        ssh_cmd = ['ssh']
92        cmdline = ['timeout', '%s' % (worker_timeout + 30)] + cmdline
93        ssh_cmd.extend([
94            str(user_at_host),
95            'cd ~/performance_workspace/grpc/ && python tools/run_tests/start_port_server.py && %s'
96            % ' '.join(cmdline)
97        ])
98        cmdline = ssh_cmd
99
100    jobspec = jobset.JobSpec(
101        cmdline=cmdline,
102        shortname=shortname,
103        timeout_seconds=
104        worker_timeout,  # workers get restarted after each scenario
105        verbose_success=True)
106    return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name)
107
108
109def create_scenario_jobspec(scenario_json,
110                            workers,
111                            remote_host=None,
112                            bq_result_table=None,
113                            server_cpu_load=0):
114    """Runs one scenario using QPS driver."""
115    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
116    cmd = 'QPS_WORKERS="%s" ' % ','.join(workers)
117    if bq_result_table:
118        cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
119    cmd += 'tools/run_tests/performance/run_qps_driver.sh '
120    cmd += '--scenarios_json=%s ' % pipes.quote(
121        json.dumps({
122            'scenarios': [scenario_json]
123        }))
124    cmd += '--scenario_result_file=scenario_result.json '
125    if server_cpu_load != 0:
126        cmd += '--search_param=offered_load --initial_search_value=1000 --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01' % server_cpu_load
127    if remote_host:
128        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
129        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
130            user_at_host, pipes.quote(cmd))
131
132    return jobset.JobSpec(
133        cmdline=[cmd],
134        shortname='qps_json_driver.%s' % scenario_json['name'],
135        timeout_seconds=12 * 60,
136        shell=True,
137        verbose_success=True)
138
139
140def create_quit_jobspec(workers, remote_host=None):
141    """Runs quit using QPS driver."""
142    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
143    cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver --quit' % ','.join(
144        w.host_and_port for w in workers)
145    if remote_host:
146        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
147        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
148            user_at_host, pipes.quote(cmd))
149
150    return jobset.JobSpec(
151        cmdline=[cmd],
152        shortname='qps_json_driver.quit',
153        timeout_seconds=3 * 60,
154        shell=True,
155        verbose_success=True)
156
157
158def create_netperf_jobspec(server_host='localhost',
159                           client_host=None,
160                           bq_result_table=None):
161    """Runs netperf benchmark."""
162    cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host
163    if bq_result_table:
164        cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
165    if client_host:
166        # If netperf is running remotely, the env variables populated by Jenkins
167        # won't be available on the client, but we need them for uploading results
168        # to BigQuery.
169        jenkins_job_name = os.getenv('JOB_NAME')
170        if jenkins_job_name:
171            cmd += 'JOB_NAME="%s" ' % jenkins_job_name
172        jenkins_build_number = os.getenv('BUILD_NUMBER')
173        if jenkins_build_number:
174            cmd += 'BUILD_NUMBER="%s" ' % jenkins_build_number
175
176    cmd += 'tools/run_tests/performance/run_netperf.sh'
177    if client_host:
178        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host)
179        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
180            user_at_host, pipes.quote(cmd))
181
182    return jobset.JobSpec(
183        cmdline=[cmd],
184        shortname='netperf',
185        timeout_seconds=60,
186        shell=True,
187        verbose_success=True)
188
189
190def archive_repo(languages):
191    """Archives local version of repo including submodules."""
192    # Directory contains symlinks that can't be correctly untarred on Windows
193    # so we just skip them as a workaround.
194    # See https://github.com/grpc/grpc/issues/16334
195    bad_symlinks_dir = '../grpc/third_party/libcxx/test/std/experimental/filesystem/Inputs/static_test_env'
196    cmdline = [
197        'tar', '--exclude', bad_symlinks_dir, '-cf', '../grpc.tar', '../grpc/'
198    ]
199    if 'java' in languages:
200        cmdline.append('../grpc-java')
201    if 'go' in languages:
202        cmdline.append('../grpc-go')
203    if 'node' in languages or 'node_purejs' in languages:
204        cmdline.append('../grpc-node')
205
206    archive_job = jobset.JobSpec(
207        cmdline=cmdline, shortname='archive_repo', timeout_seconds=3 * 60)
208
209    jobset.message('START', 'Archiving local repository.', do_newline=True)
210    num_failures, _ = jobset.run(
211        [archive_job], newline_on_success=True, maxjobs=1)
212    if num_failures == 0:
213        jobset.message(
214            'SUCCESS',
215            'Archive with local repository created successfully.',
216            do_newline=True)
217    else:
218        jobset.message(
219            'FAILED', 'Failed to archive local repository.', do_newline=True)
220        sys.exit(1)
221
222
223def prepare_remote_hosts(hosts, prepare_local=False):
224    """Prepares remote hosts (and maybe prepare localhost as well)."""
225    prepare_timeout = 10 * 60
226    prepare_jobs = []
227    for host in hosts:
228        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
229        prepare_jobs.append(
230            jobset.JobSpec(
231                cmdline=['tools/run_tests/performance/remote_host_prepare.sh'],
232                shortname='remote_host_prepare.%s' % host,
233                environ={'USER_AT_HOST': user_at_host},
234                timeout_seconds=prepare_timeout))
235    if prepare_local:
236        # Prepare localhost as well
237        prepare_jobs.append(
238            jobset.JobSpec(
239                cmdline=['tools/run_tests/performance/kill_workers.sh'],
240                shortname='local_prepare',
241                timeout_seconds=prepare_timeout))
242    jobset.message('START', 'Preparing hosts.', do_newline=True)
243    num_failures, _ = jobset.run(
244        prepare_jobs, newline_on_success=True, maxjobs=10)
245    if num_failures == 0:
246        jobset.message(
247            'SUCCESS', 'Prepare step completed successfully.', do_newline=True)
248    else:
249        jobset.message(
250            'FAILED', 'Failed to prepare remote hosts.', do_newline=True)
251        sys.exit(1)
252
253
254def build_on_remote_hosts(hosts,
255                          languages=scenario_config.LANGUAGES.keys(),
256                          build_local=False):
257    """Builds performance worker on remote hosts (and maybe also locally)."""
258    build_timeout = 45 * 60
259    # Kokoro VMs (which are local only) do not have caching, so they need more time to build
260    local_build_timeout = 60 * 60
261    build_jobs = []
262    for host in hosts:
263        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
264        build_jobs.append(
265            jobset.JobSpec(
266                cmdline=['tools/run_tests/performance/remote_host_build.sh'] +
267                languages,
268                shortname='remote_host_build.%s' % host,
269                environ={'USER_AT_HOST': user_at_host,
270                         'CONFIG': 'opt'},
271                timeout_seconds=build_timeout))
272    if build_local:
273        # Build locally as well
274        build_jobs.append(
275            jobset.JobSpec(
276                cmdline=['tools/run_tests/performance/build_performance.sh'] +
277                languages,
278                shortname='local_build',
279                environ={'CONFIG': 'opt'},
280                timeout_seconds=local_build_timeout))
281    jobset.message('START', 'Building.', do_newline=True)
282    num_failures, _ = jobset.run(
283        build_jobs, newline_on_success=True, maxjobs=10)
284    if num_failures == 0:
285        jobset.message('SUCCESS', 'Built successfully.', do_newline=True)
286    else:
287        jobset.message('FAILED', 'Build failed.', do_newline=True)
288        sys.exit(1)
289
290
291def create_qpsworkers(languages, worker_hosts, perf_cmd=None):
292    """Creates QPS workers (but does not start them)."""
293    if not worker_hosts:
294        # run two workers locally (for each language)
295        workers = [(None, 10000), (None, 10010)]
296    elif len(worker_hosts) == 1:
297        # run two workers on the remote host (for each language)
298        workers = [(worker_hosts[0], 10000), (worker_hosts[0], 10010)]
299    else:
300        # run one worker per each remote host (for each language)
301        workers = [(worker_host, 10000) for worker_host in worker_hosts]
302
303    return [
304        create_qpsworker_job(
305            language,
306            shortname='qps_worker_%s_%s' % (language, worker_idx),
307            port=worker[1] + language.worker_port_offset(),
308            remote_host=worker[0],
309            perf_cmd=perf_cmd)
310        for language in languages
311        for worker_idx, worker in enumerate(workers)
312    ]
313
314
315def perf_report_processor_job(worker_host, perf_base_name, output_filename,
316                              flame_graph_reports):
317    print('Creating perf report collection job for %s' % worker_host)
318    cmd = ''
319    if worker_host != 'localhost':
320        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host)
321        cmd = "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%stools/run_tests/performance/process_remote_perf_flamegraphs.sh" % (
322            user_at_host, output_filename, flame_graph_reports, perf_base_name)
323    else:
324        cmd = "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%stools/run_tests/performance/process_local_perf_flamegraphs.sh" % (
325            output_filename, flame_graph_reports, perf_base_name)
326
327    return jobset.JobSpec(
328        cmdline=cmd,
329        timeout_seconds=3 * 60,
330        shell=True,
331        verbose_success=True,
332        shortname='process perf report')
333
334
335Scenario = collections.namedtuple('Scenario', 'jobspec workers name')
336
337
338def create_scenarios(languages,
339                     workers_by_lang,
340                     remote_host=None,
341                     regex='.*',
342                     category='all',
343                     bq_result_table=None,
344                     netperf=False,
345                     netperf_hosts=[],
346                     server_cpu_load=0):
347    """Create jobspecs for scenarios to run."""
348    all_workers = [
349        worker for workers in workers_by_lang.values() for worker in workers
350    ]
351    scenarios = []
352    _NO_WORKERS = []
353
354    if netperf:
355        if not netperf_hosts:
356            netperf_server = 'localhost'
357            netperf_client = None
358        elif len(netperf_hosts) == 1:
359            netperf_server = netperf_hosts[0]
360            netperf_client = netperf_hosts[0]
361        else:
362            netperf_server = netperf_hosts[0]
363            netperf_client = netperf_hosts[1]
364        scenarios.append(
365            Scenario(
366                create_netperf_jobspec(
367                    server_host=netperf_server,
368                    client_host=netperf_client,
369                    bq_result_table=bq_result_table), _NO_WORKERS, 'netperf'))
370
371    for language in languages:
372        for scenario_json in language.scenarios():
373            if re.search(regex, scenario_json['name']):
374                categories = scenario_json.get('CATEGORIES',
375                                               ['scalable', 'smoketest'])
376                if category in categories or category == 'all':
377                    workers = workers_by_lang[str(language)][:]
378                    # 'SERVER_LANGUAGE' is an indicator for this script to pick
379                    # a server in different language.
380                    custom_server_lang = scenario_json.get(
381                        'SERVER_LANGUAGE', None)
382                    custom_client_lang = scenario_json.get(
383                        'CLIENT_LANGUAGE', None)
384                    scenario_json = scenario_config.remove_nonproto_fields(
385                        scenario_json)
386                    if custom_server_lang and custom_client_lang:
387                        raise Exception(
388                            'Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE'
389                            'in the same scenario')
390                    if custom_server_lang:
391                        if not workers_by_lang.get(custom_server_lang, []):
392                            print('Warning: Skipping scenario %s as' %
393                                  scenario_json['name'])
394                            print(
395                                'SERVER_LANGUAGE is set to %s yet the language has '
396                                'not been selected with -l' %
397                                custom_server_lang)
398                            continue
399                        for idx in range(0, scenario_json['num_servers']):
400                            # replace first X workers by workers of a different language
401                            workers[idx] = workers_by_lang[custom_server_lang][
402                                idx]
403                    if custom_client_lang:
404                        if not workers_by_lang.get(custom_client_lang, []):
405                            print('Warning: Skipping scenario %s as' %
406                                  scenario_json['name'])
407                            print(
408                                'CLIENT_LANGUAGE is set to %s yet the language has '
409                                'not been selected with -l' %
410                                custom_client_lang)
411                            continue
412                        for idx in range(scenario_json['num_servers'],
413                                         len(workers)):
414                            # replace all client workers by workers of a different language,
415                            # leave num_server workers as they are server workers.
416                            workers[idx] = workers_by_lang[custom_client_lang][
417                                idx]
418                    scenario = Scenario(
419                        create_scenario_jobspec(
420                            scenario_json, [w.host_and_port for w in workers],
421                            remote_host=remote_host,
422                            bq_result_table=bq_result_table,
423                            server_cpu_load=server_cpu_load), workers,
424                        scenario_json['name'])
425                    scenarios.append(scenario)
426
427    return scenarios
428
429
430def finish_qps_workers(jobs, qpsworker_jobs):
431    """Waits for given jobs to finish and eventually kills them."""
432    retries = 0
433    num_killed = 0
434    while any(job.is_running() for job in jobs):
435        for job in qpsworker_jobs:
436            if job.is_running():
437                print('QPS worker "%s" is still running.' % job.host_and_port)
438        if retries > 10:
439            print('Killing all QPS workers.')
440            for job in jobs:
441                job.kill()
442                num_killed += 1
443        retries += 1
444        time.sleep(3)
445    print('All QPS workers finished.')
446    return num_killed
447
448
449profile_output_files = []
450
451
452# Collect perf text reports and flamegraphs if perf_cmd was used
453# Note the base names of perf text reports are used when creating and processing
454# perf data. The scenario name uniqifies the output name in the final
455# perf reports directory.
456# Alos, the perf profiles need to be fetched and processed after each scenario
457# in order to avoid clobbering the output files.
458def run_collect_perf_profile_jobs(hosts_and_base_names, scenario_name,
459                                  flame_graph_reports):
460    perf_report_jobs = []
461    global profile_output_files
462    for host_and_port in hosts_and_base_names:
463        perf_base_name = hosts_and_base_names[host_and_port]
464        output_filename = '%s-%s' % (scenario_name, perf_base_name)
465        # from the base filename, create .svg output filename
466        host = host_and_port.split(':')[0]
467        profile_output_files.append('%s.svg' % output_filename)
468        perf_report_jobs.append(
469            perf_report_processor_job(host, perf_base_name, output_filename,
470                                      flame_graph_reports))
471
472    jobset.message(
473        'START', 'Collecting perf reports from qps workers', do_newline=True)
474    failures, _ = jobset.run(
475        perf_report_jobs, newline_on_success=True, maxjobs=1)
476    jobset.message(
477        'END', 'Collecting perf reports from qps workers', do_newline=True)
478    return failures
479
480
481def main():
482    argp = argparse.ArgumentParser(description='Run performance tests.')
483    argp.add_argument(
484        '-l',
485        '--language',
486        choices=['all'] + sorted(scenario_config.LANGUAGES.keys()),
487        nargs='+',
488        required=True,
489        help='Languages to benchmark.')
490    argp.add_argument(
491        '--remote_driver_host',
492        default=None,
493        help=
494        'Run QPS driver on given host. By default, QPS driver is run locally.')
495    argp.add_argument(
496        '--remote_worker_host',
497        nargs='+',
498        default=[],
499        help='Worker hosts where to start QPS workers.')
500    argp.add_argument(
501        '--dry_run',
502        default=False,
503        action='store_const',
504        const=True,
505        help='Just list scenarios to be run, but don\'t run them.')
506    argp.add_argument(
507        '-r',
508        '--regex',
509        default='.*',
510        type=str,
511        help='Regex to select scenarios to run.')
512    argp.add_argument(
513        '--bq_result_table',
514        default=None,
515        type=str,
516        help='Bigquery "dataset.table" to upload results to.')
517    argp.add_argument(
518        '--category',
519        choices=['smoketest', 'all', 'scalable', 'sweep'],
520        default='all',
521        help='Select a category of tests to run.')
522    argp.add_argument(
523        '--netperf',
524        default=False,
525        action='store_const',
526        const=True,
527        help='Run netperf benchmark as one of the scenarios.')
528    argp.add_argument(
529        '--server_cpu_load',
530        default=0,
531        type=int,
532        help='Select a targeted server cpu load to run. 0 means ignore this flag'
533    )
534    argp.add_argument(
535        '-x',
536        '--xml_report',
537        default='report.xml',
538        type=str,
539        help='Name of XML report file to generate.')
540    argp.add_argument(
541        '--perf_args',
542        help=('Example usage: "--perf_args=record -F 99 -g". '
543              'Wrap QPS workers in a perf command '
544              'with the arguments to perf specified here. '
545              '".svg" flame graph profiles will be '
546              'created for each Qps Worker on each scenario. '
547              'Files will output to "<repo_root>/<args.flame_graph_reports>" '
548              'directory. Output files from running the worker '
549              'under perf are saved in the repo root where its ran. '
550              'Note that the perf "-g" flag is necessary for '
551              'flame graphs generation to work (assuming the binary '
552              'being profiled uses frame pointers, check out '
553              '"--call-graph dwarf" option using libunwind otherwise.) '
554              'Also note that the entire "--perf_args=<arg(s)>" must '
555              'be wrapped in quotes as in the example usage. '
556              'If the "--perg_args" is unspecified, "perf" will '
557              'not be used at all. '
558              'See http://www.brendangregg.com/perf.html '
559              'for more general perf examples.'))
560    argp.add_argument(
561        '--skip_generate_flamegraphs',
562        default=False,
563        action='store_const',
564        const=True,
565        help=('Turn flame graph generation off. '
566              'May be useful if "perf_args" arguments do not make sense for '
567              'generating flamegraphs (e.g., "--perf_args=stat ...")'))
568    argp.add_argument(
569        '-f',
570        '--flame_graph_reports',
571        default='perf_reports',
572        type=str,
573        help=
574        'Name of directory to output flame graph profiles to, if any are created.'
575    )
576    argp.add_argument(
577        '-u',
578        '--remote_host_username',
579        default='',
580        type=str,
581        help='Use a username that isn\'t "Jenkins" to SSH into remote workers.')
582
583    args = argp.parse_args()
584
585    global _REMOTE_HOST_USERNAME
586    if args.remote_host_username:
587        _REMOTE_HOST_USERNAME = args.remote_host_username
588
589    languages = set(
590        scenario_config.LANGUAGES[l]
591        for l in itertools.chain.from_iterable(
592            six.iterkeys(scenario_config.LANGUAGES) if x == 'all' else [x]
593            for x in args.language))
594
595    # Put together set of remote hosts where to run and build
596    remote_hosts = set()
597    if args.remote_worker_host:
598        for host in args.remote_worker_host:
599            remote_hosts.add(host)
600    if args.remote_driver_host:
601        remote_hosts.add(args.remote_driver_host)
602
603    if not args.dry_run:
604        if remote_hosts:
605            archive_repo(languages=[str(l) for l in languages])
606            prepare_remote_hosts(remote_hosts, prepare_local=True)
607        else:
608            prepare_remote_hosts([], prepare_local=True)
609
610    build_local = False
611    if not args.remote_driver_host:
612        build_local = True
613    if not args.dry_run:
614        build_on_remote_hosts(
615            remote_hosts,
616            languages=[str(l) for l in languages],
617            build_local=build_local)
618
619    perf_cmd = None
620    if args.perf_args:
621        print('Running workers under perf profiler')
622        # Expect /usr/bin/perf to be installed here, as is usual
623        perf_cmd = ['/usr/bin/perf']
624        perf_cmd.extend(re.split('\s+', args.perf_args))
625
626    qpsworker_jobs = create_qpsworkers(
627        languages, args.remote_worker_host, perf_cmd=perf_cmd)
628
629    # get list of worker addresses for each language.
630    workers_by_lang = dict([(str(language), []) for language in languages])
631    for job in qpsworker_jobs:
632        workers_by_lang[str(job.language)].append(job)
633
634    scenarios = create_scenarios(
635        languages,
636        workers_by_lang=workers_by_lang,
637        remote_host=args.remote_driver_host,
638        regex=args.regex,
639        category=args.category,
640        bq_result_table=args.bq_result_table,
641        netperf=args.netperf,
642        netperf_hosts=args.remote_worker_host,
643        server_cpu_load=args.server_cpu_load)
644
645    if not scenarios:
646        raise Exception('No scenarios to run')
647
648    total_scenario_failures = 0
649    qps_workers_killed = 0
650    merged_resultset = {}
651    perf_report_failures = 0
652
653    for scenario in scenarios:
654        if args.dry_run:
655            print(scenario.name)
656        else:
657            scenario_failures = 0
658            try:
659                for worker in scenario.workers:
660                    worker.start()
661                jobs = [scenario.jobspec]
662                if scenario.workers:
663                    jobs.append(
664                        create_quit_jobspec(
665                            scenario.workers,
666                            remote_host=args.remote_driver_host))
667                scenario_failures, resultset = jobset.run(
668                    jobs, newline_on_success=True, maxjobs=1)
669                total_scenario_failures += scenario_failures
670                merged_resultset = dict(
671                    itertools.chain(
672                        six.iteritems(merged_resultset),
673                        six.iteritems(resultset)))
674            finally:
675                # Consider qps workers that need to be killed as failures
676                qps_workers_killed += finish_qps_workers(
677                    scenario.workers, qpsworker_jobs)
678
679            if perf_cmd and scenario_failures == 0 and not args.skip_generate_flamegraphs:
680                workers_and_base_names = {}
681                for worker in scenario.workers:
682                    if not worker.perf_file_base_name:
683                        raise Exception(
684                            'using perf buf perf report filename is unspecified'
685                        )
686                    workers_and_base_names[
687                        worker.host_and_port] = worker.perf_file_base_name
688                perf_report_failures += run_collect_perf_profile_jobs(
689                    workers_and_base_names, scenario.name,
690                    args.flame_graph_reports)
691
692    # Still write the index.html even if some scenarios failed.
693    # 'profile_output_files' will only have names for scenarios that passed
694    if perf_cmd and not args.skip_generate_flamegraphs:
695        # write the index fil to the output dir, with all profiles from all scenarios/workers
696        report_utils.render_perf_profiling_results(
697            '%s/index.html' % args.flame_graph_reports, profile_output_files)
698
699    report_utils.render_junit_xml_report(
700        merged_resultset, args.xml_report, suite_name='benchmarks')
701
702    if total_scenario_failures > 0 or qps_workers_killed > 0:
703        print('%s scenarios failed and %s qps worker jobs killed' %
704              (total_scenario_failures, qps_workers_killed))
705        sys.exit(1)
706
707    if perf_report_failures > 0:
708        print('%s perf profile collection jobs failed' % perf_report_failures)
709        sys.exit(1)
710
711
712if __name__ == "__main__":
713    main()
714