1#!/usr/bin/env python2
3# Copyright 2017 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
7"""Load generator for devserver.
9Example usage:
11# Find DUTs in suites pool to test with:
12atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready
14# Lock DUTs:
15atest host mod -l -r 'quick provision testing' DUT1 DUT2
17# Create config file with DUTs to test and builds to use.
18cat >config.json <<EOD
20  "BOARD": {
21    "duts": [
22      "chromeosX-rowY-rackZ-hostA",
23      "chromeosX-rowY-rackZ-hostB",
24    ],
25    "versions": [
26      "auron_paine-paladin/R65-10208.0.0-rc2",
27      "auron_paine-paladin/R65-10208.0.0-rc3",
28      "auron_paine-paladin/R65-10209.0.0-rc1"
29    ]
30  },
34# Do 100 total provisions, aiming to have 10 active simultaneously.
35loadtest.py $DS config.json --simultaneous 10 --total 100
37# Unlock DUTs:
38atest host mod -u DUT1 DUT2
41import collections
42import datetime
43import json
44import random
45import re
46import signal
47import subprocess
48import sys
49import time
51import common
52from autotest_lib.client.common_lib import time_utils
53from autotest_lib.client.common_lib.cros import dev_server
54from chromite.lib import commandline
55from chromite.lib import cros_logging as logging
56from chromite.lib import locking
57from chromite.lib import parallel
59# Paylods to stage.
60PAYLOADS = ['quick_provision', 'stateful']
62# Number of seconds between full status checks.
65# Number of successes/failures to blacklist a DUT.
70def get_parser():
71    """Creates the argparse parser."""
72    parser = commandline.ArgumentParser(description=__doc__)
73    parser.add_argument('server', type=str, action='store',
74                        help='Devserver to load test.')
75    parser.add_argument('config', type=str, action='store',
76                        help='Path to JSON config file.'
77                             'Config file is indexed by board with keys of '
78                             '"duts" and "versions", each a list.')
79    parser.add_argument('--blacklist-consecutive', '-C', type=int,
80                        action='store',
81                        help=('Consecutive number of failures before '
82                              'blacklisting DUT (default %d).') %
83                             BLACKLIST_CONSECUTIVE_FAILURE,
84                        default=BLACKLIST_CONSECUTIVE_FAILURE)
85    parser.add_argument('--blacklist-success', '-S', type=int, action='store',
86                        help=('Total number of successes before blacklisting '
87                              'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS,
88                        default=BLACKLIST_TOTAL_SUCCESS)
89    parser.add_argument('--blacklist-total', '-T', type=int, action='store',
90                        help=('Total number of failures before blacklisting '
91                              'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE,
92                        default=BLACKLIST_TOTAL_FAILURE)
93    parser.add_argument('--boards', '-b', type=str, action='store',
94                        help='Comma-separated list of boards to provision.')
95    parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun',
96                        help='Do not attempt to provision.')
97    parser.add_argument('--duts', '-d', type=str, action='store',
98                        help='Comma-separated list of duts to provision.')
99    parser.add_argument('--outputlog', '-l', type=str, action='store',
100                        help='Path to append JSON entries to.')
101    parser.add_argument('--output', '-o', type=str, action='store',
102                        help='Path to write JSON file to.')
103    parser.add_argument('--ping', '-p', action='store_true',
104                        help='Ping DUTs and blacklist unresponsive ones.')
105    parser.add_argument('--simultaneous', '-s', type=int, action='store',
106                        help='Number of simultaneous provisions to run.',
107                        default=1)
108    parser.add_argument('--no-stage', action='store_false',
109                        dest='stage', default=True,
110                        help='Do not attempt to stage builds.')
111    parser.add_argument('--total', '-t', type=int, action='store',
112                        help='Number of total provisions to run.',
113                        default=0)
114    return parser
116def make_entry(entry_id, name, status, start_time,
117               finish_time=None, parent=None, **kwargs):
118    """Generate an event log entry to be stored in Cloud Datastore.
120    @param entry_id: A (Kind, id) tuple representing the key.
121    @param name: A string identifying the event
122    @param status: A string identifying the status of the event.
123    @param start_time: A datetime of the start of the event.
124    @param finish_time: A datetime of the finish of the event.
125    @param parent: A (Kind, id) tuple representing the parent key.
127    @return A dictionary representing the entry suitable for dumping via JSON.
128    """
129    entry = {
130        'id': entry_id,
131        'name': name,
132        'status': status,
133        'start_time': time_utils.to_epoch_time(start_time),
134    }
135    if finish_time is not None:
136        entry['finish_time'] = time_utils.to_epoch_time(finish_time)
137    if parent is not None:
138        entry['parent'] = parent
139    return entry
141class Job(object):
142    """Tracks a single provision job."""
143    def __init__(self, ds, host_name, build_name,
144                 entry_id=0, parent=None, board=None,
145                 start_active=0,
146                 force_update=False, full_update=False,
147                 clobber_stateful=True, quick_provision=True,
148                 ping=False, dryrun=False):
150        self.ds = ds
151        self.host_name = host_name
152        self.build_name = build_name
154        self.entry_id = ('Job', entry_id)
155        self.parent = parent
156        self.board = board
157        self.start_active = start_active
158        self.end_active = None
159        self.check_active_sum = 0
160        self.check_active_count = 0
162        self.start_time = datetime.datetime.now()
163        self.finish_time = None
164        self.trigger_response = None
166        self.ping = ping
167        self.pre_ping = None
168        self.post_ping = None
170        self.kwargs = {
171            'host_name': host_name,
172            'build_name': build_name,
173            'force_update': force_update,
174            'full_update': full_update,
175            'clobber_stateful': clobber_stateful,
176            'quick_provision': quick_provision,
177        }
179        if dryrun:
180            self.finish_time = datetime.datetime.now()
181            self.raised_error = None
182            self.success = True
183            self.pid = 0
184        else:
185            if self.ping:
186                self.pre_ping = ping_dut(self.host_name)
187            self.trigger_response = ds._trigger_auto_update(**self.kwargs)
189    def as_entry(self):
190        """Generate an entry for exporting to datastore."""
191        entry = make_entry(self.entry_id, self.host_name,
192                           'pass' if self.success else 'fail',
193                           self.start_time, self.finish_time, self.parent)
194        entry.update({
195            'build_name': self.build_name,
196            'board': self.board,
197            'devserver': self.ds.hostname,
198            'start_active': self.start_active,
199            'end_active': self.end_active,
200            'force_update': self.kwargs['force_update'],
201            'full_update': self.kwargs['full_update'],
202            'clobber_stateful': self.kwargs['clobber_stateful'],
203            'quick_provision': self.kwargs['quick_provision'],
204            'elapsed': int(self.elapsed().total_seconds()),
205            'trigger_response': self.trigger_response,
206            'pre_ping': self.pre_ping,
207            'post_ping': self.post_ping,
208        })
209        if self.check_active_count:
210            entry['avg_active'] = (self.check_active_sum /
211                                   self.check_active_count)
212        return entry
214    def check(self, active_count):
215        """Checks if a job has completed.
217        @param active_count: Number of active provisions at time of the check.
218        @return: True if the job has completed, False otherwise.
219        """
220        if self.finish_time is not None:
221            return True
223        self.check_active_sum += active_count
224        self.check_active_count += 1
226        finished, raised_error, pid = self.ds.check_for_auto_update_finished(
227            self.trigger_response, wait=False, **self.kwargs)
228        if finished:
229            self.finish_time = datetime.datetime.now()
230            self.raised_error = raised_error
231            self.success = self.raised_error is None
232            self.pid = pid
233            self.end_active = active_count
234            if self.ping:
235                self.post_ping = ping_dut(self.host_name)
237        return finished
239    def elapsed(self):
240        """Determine the elapsed time of the task."""
241        finish_time = self.finish_time or datetime.datetime.now()
242        return finish_time - self.start_time
244class Runner(object):
245    """Parallel provision load test runner."""
246    def __init__(self, ds, duts, config, simultaneous=1, total=0,
247                 outputlog=None, ping=False, blacklist_consecutive=None,
248                 blacklist_success=None, blacklist_total=None, dryrun=False):
249        self.ds = ds
250        self.duts = duts
251        self.config = config
252        self.start_time = datetime.datetime.now()
253        self.finish_time = None
254        self.simultaneous = simultaneous
255        self.total = total
256        self.outputlog = outputlog
257        self.ping = ping
258        self.blacklist_consecutive = blacklist_consecutive
259        self.blacklist_success = blacklist_success
260        self.blacklist_total = blacklist_total
261        self.dryrun = dryrun
263        self.active = []
264        self.started = 0
265        self.completed = []
266        # Track DUTs which have failed multiple times.
267        self.dut_blacklist = set()
268        # Track versions of each DUT to provision in order.
269        self.last_versions = {}
271        # id for the parent entry.
272        # TODO: This isn't the most unique.
273        self.entry_id = ('Runner',
274                         int(time_utils.to_epoch_time(datetime.datetime.now())))
276        # ids for the job entries.
277        self.next_id = 0
279        if self.outputlog:
280            dump_entries_as_json([self.as_entry()], self.outputlog)
282    def signal_handler(self, signum, frame):
283        """Signal handle to dump current status."""
284        logging.info('Received signal %s', signum)
285        if signum == signal.SIGUSR1:
286            now = datetime.datetime.now()
287            logging.info('%d active provisions, %d completed provisions, '
288                         '%s elapsed:',
289                         len(self.active), len(self.completed),
290                         now - self.start_time)
291            for job in self.active:
292                logging.info('  %s -> %s, %s elapsed',
293                             job.host_name, job.build_name,
294                             now - job.start_time)
296    def as_entry(self):
297        """Generate an entry for exporting to datastore."""
298        entry = make_entry(self.entry_id, 'Runner', 'pass',
299                           self.start_time, self.finish_time)
300        entry.update({
301            'devserver': self.ds.hostname,
302        })
303        return entry
305    def get_completed_entries(self):
306        """Retrieves all completed jobs as entries for datastore."""
307        entries = [self.as_entry()]
308        entries.extend([job.as_entry() for job in self.completed])
309        return entries
311    def get_next_id(self):
312        """Get the next Job id."""
313        entry_id = self.next_id
314        self.next_id += 1
315        return entry_id
317    def spawn(self, host_name, build_name):
318        """Spawn a single provision job."""
319        job = Job(self.ds, host_name, build_name,
320                  entry_id=self.get_next_id(), parent=self.entry_id,
321                  board=self.get_dut_board_type(host_name),
322                  start_active=len(self.active), ping=self.ping,
323                  dryrun=self.dryrun)
324        self.active.append(job)
325        logging.info('Provision (%d) of %s to %s started',
326                     job.entry_id[1], job.host_name, job.build_name)
327        self.last_versions[host_name] = build_name
328        self.started += 1
330    def replenish(self):
331        """Replenish the number of active provisions to match goals."""
332        while ((self.simultaneous == 0 or
333                len(self.active) < self.simultaneous) and
334               (self.total == 0 or self.started < self.total)):
335            host_name = self.find_idle_dut()
336            if host_name:
337                build_name = self.find_build_for_dut(host_name)
338                self.spawn(host_name, build_name)
339            elif self.simultaneous:
340                logging.warn('Insufficient DUTs to satisfy goal')
341                return False
342            else:
343                return len(self.active) > 0
344        return True
346    def check_all(self):
347        """Check the status of outstanding provisions."""
348        still_active = []
349        for job in self.active:
350            if job.check(len(self.active)):
351                logging.info('Provision (%d) of %s to %s %s in %s: %s',
352                             job.entry_id[1], job.host_name, job.build_name,
353                             'completed' if job.success else 'failed',
354                             job.elapsed(), job.raised_error)
355                entry = job.as_entry()
356                logging.debug(json.dumps(entry))
357                if self.outputlog:
358                    dump_entries_as_json([entry], self.outputlog)
359                self.completed.append(job)
360                if self.should_blacklist(job.host_name):
361                    logging.error('Blacklisting DUT %s', job.host_name)
362                    self.dut_blacklist.add(job.host_name)
363            else:
364                still_active.append(job)
365        self.active = still_active
367    def should_blacklist(self, host_name):
368        """Determines if a given DUT should be blacklisted."""
369        jobs = [job for job in self.completed if job.host_name == host_name]
370        total = 0
371        consecutive = 0
372        successes = 0
373        for job in jobs:
374            if not job.success:
375                total += 1
376                consecutive += 1
377                if ((self.blacklist_total is not None and
378                     total >= self.blacklist_total) or
379                    (self.blacklist_consecutive is not None and
380                     consecutive >= self.blacklist_consecutive)):
381                    return True
382            else:
383                successes += 1
384                if (self.blacklist_success is not None and
385                    successes >= self.blacklist_success):
386                    return True
387                consecutive = 0
388        return False
390    def find_idle_dut(self):
391        """Find an idle DUT to provision.."""
392        active_duts = {job.host_name for job in self.active}
393        idle_duts = [d for d in self.duts
394                     if d not in active_duts | self.dut_blacklist]
395        return random.choice(idle_duts) if len(idle_duts) else None
397    def get_dut_board_type(self, host_name):
398        """Determine the board type of a DUT."""
399        return self.duts[host_name]
401    def get_board_versions(self, board):
402        """Determine the versions to provision for a board."""
403        return self.config[board]['versions']
405    def find_build_for_dut(self, host_name):
406        """Determine a build to provision on a DUT."""
407        board = self.get_dut_board_type(host_name)
408        versions = self.get_board_versions(board)
409        last_version = self.last_versions.get(host_name)
410        try:
411            last_index = versions.index(last_version)
412        except ValueError:
413            return versions[0]
414        return versions[(last_index + 1) % len(versions)]
416    def stage(self, build):
417        """Stage artifacts for a given build."""
418        logging.debug('Staging %s', build)
419        self.ds.stage_artifacts(build, PAYLOADS)
421    def stage_all(self):
422        """Stage all necessary artifacts."""
423        boards = set(self.duts.values())
424        logging.info('Staging for %d boards', len(boards))
425        funcs = []
426        for board in boards:
427            for build in self.get_board_versions(board):
428                funcs.append(lambda build_=build: self.stage(build_))
429        parallel.RunParallelSteps(funcs)
431    def loop(self):
432        """Run the main provision loop."""
433        # Install a signal handler for status updates.
434        old_handler = signal.signal(signal.SIGUSR1, self.signal_handler)
435        signal.siginterrupt(signal.SIGUSR1, False)
437        try:
438            while True:
439                self.check_all()
440                if self.total != 0 and len(self.completed) >= self.total:
441                    break
442                if not self.replenish() and len(self.active) == 0:
443                    logging.error('Unable to replenish with no active '
444                                  'provisions')
445                    return False
446                logging.debug('%d provisions active', len(self.active))
447                time.sleep(STATUS_POLL_SECONDS)
448            return True
449        except KeyboardInterrupt:
450            return False
451        finally:
452            self.finish_time = datetime.datetime.now()
453            # Clean up signal handler.
454            signal.signal(signal.SIGUSR1, old_handler)
456    def elapsed(self):
457        """Determine the elapsed time of the task."""
458        finish_time = self.finish_time or datetime.datetime.now()
459        return finish_time - self.start_time
461def dump_entries_as_json(entries, output_file):
462    """Dump event log entries as json to a file.
464    @param entries: A list of event log entries to dump.
465    @param output_file: The file to write to.
466    """
467    # Write the entries out as JSON.
468    logging.debug('Dumping %d entries' % len(entries))
469    for e in entries:
470        json.dump(e, output_file, sort_keys=True)
471        output_file.write('\n')
472        output_file.flush()
474def ping_dut(hostname):
475    """Checks if a host is responsive to pings."""
476    if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname):
477        hostname += '.cros'
479    response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname],
480                               stdout=subprocess.PIPE)
481    return response == 0
483def main(argv):
484    """Load generator for a devserver."""
485    parser = get_parser()
486    options = parser.parse_args(argv)
488    # Parse devserver.
489    if options.server:
490        if re.match(r'^https?://', options.server):
491            server = options.server
492        else:
493            server = 'http://%s/' % options.server
494        ds = dev_server.ImageServer(server)
495    else:
496        parser.print_usage()
497        logging.error('Must specify devserver')
498        sys.exit(1)
500    # Parse config file and determine master list of duts and their board type,
501    # filtering by board type if specified.
502    duts = {}
503    if options.config:
504        with open(options.config, 'r') as f:
505            config = json.load(f)
506            boards = (options.boards.split(',')
507                      if options.boards else config.keys())
508            duts_specified = (set(options.duts.split(','))
509                              if options.duts else None)
510            for board in boards:
511                duts.update({dut: board for dut in config[board]['duts']
512                             if duts_specified is None or
513                                dut in duts_specified})
514        logging.info('Config file %s: %d boards, %d duts',
515                     options.config, len(boards), len(duts))
516    else:
517        parser.print_usage()
518        logging.error('Must specify config file')
519        sys.exit(1)
521    if options.ping:
522        logging.info('Performing ping tests')
523        duts_alive = {}
524        for dut, board in duts.items():
525            if ping_dut(dut):
526                duts_alive[dut] = board
527            else:
528                logging.error('Ignoring DUT %s (%s) for failing initial '
529                              'ping check', dut, board)
530        duts = duts_alive
531        logging.info('After ping tests: %d boards, %d duts', len(boards),
532                     len(duts))
534    # Set up the test runner and stage all the builds.
535    outputlog = open(options.outputlog, 'a') if options.outputlog else None
536    runner = Runner(ds, duts, config,
537                    simultaneous=options.simultaneous, total=options.total,
538                    outputlog=outputlog, ping=options.ping,
539                    blacklist_consecutive=options.blacklist_consecutive,
540                    blacklist_success=options.blacklist_success,
541                    blacklist_total=options.blacklist_total,
542                    dryrun=options.dryrun)
543    if options.stage:
544        runner.stage_all()
546    # Run all the provisions.
547    with locking.FileLock(options.config, blocking=True).lock():
548        completed = runner.loop()
549    logging.info('%s in %s', 'Completed' if completed else 'Interrupted',
550                 runner.elapsed())
551    # Write all entries as JSON.
552    entries = runner.get_completed_entries()
553    if options.output:
554        with open(options.output, 'w') as f:
555            dump_entries_as_json(entries, f)
556    else:
557        dump_entries_as_json(entries, sys.stdout)
558    logging.info('Summary: %s',
559                 dict(collections.Counter([e['status'] for e in entries
560                                           if e['name'] != 'Runner'])))
562    # List blacklisted DUTs.
563    if runner.dut_blacklist:
564        logging.warn('Blacklisted DUTs:')
565        for host_name in runner.dut_blacklist:
566            logging.warn('  %s', host_name)
568if __name__ == '__main__':
569    sys.exit(main(sys.argv[1:]))