1#!/usr/bin/python
2
3from __future__ import print_function
4
5import argparse
6import logging
7import multiprocessing
8import subprocess
9import sys
10from multiprocessing.pool import ThreadPool
11
12import common
13from autotest_lib.server import frontend
14from autotest_lib.site_utils.lib import infra
15
16DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py')
17POOL_SIZE = 124
18
19
20def _filter_servers(servers):
21    """Filter a set of servers to those that should be deployed to."""
22    non_push_roles = {'devserver', 'crash_server', 'reserve'}
23    for s in servers:
24        if s['status'] == 'repair_required':
25            continue
26        if s['status'] == 'backup':
27            continue
28        if set(s['roles']) & non_push_roles:
29            continue
30        yield s
31
32
33def discover_servers(afe):
34    """Discover the in-production servers to update.
35
36    Returns the set of servers from serverdb that are in production and should
37    be updated. This filters out servers in need of repair, or servers of roles
38    that are not yet supported by deploy_server / deploy_server_local.
39
40    @param afe: Server to contact with RPC requests.
41
42    @returns: A set of server hostnames.
43    """
44    # Example server details....
45    # {
46    #     'hostname': 'server1',
47    #     'status': 'backup',
48    #     'roles': ['drone', 'scheduler'],
49    #     'attributes': {'max_processes': 300}
50    # }
51    rpc = frontend.AFE(server=afe)
52    servers = rpc.run('get_servers')
53
54    return {s['hostname'] for s in _filter_servers(servers)}
55
56
57def _parse_arguments(args):
58    """Parse command line arguments.
59
60    @param args: The command line arguments to parse. (usually sys.argv[1:])
61
62    @returns A tuple of (argparse.Namespace populated with argument values,
63                         list of extra args to pass to deploy_server_local).
64    """
65    parser = argparse.ArgumentParser(
66            formatter_class=argparse.RawDescriptionHelpFormatter,
67            description='Run deploy_server_local on a bunch of servers. Extra '
68                        'arguments will be passed through.',
69            epilog=('Update all servers:\n'
70                    '  deploy_server.py -x --afe cautotest\n'
71                    '\n'
72                    'Update one server:\n'
73                    '  deploy_server.py <server> -x\n'
74                    ))
75
76    parser.add_argument('-x', action='store_true',
77                        help='Actually perform actions. If not supplied, '
78                             'script does nothing.')
79    parser.add_argument('--afe',
80            help='The AFE server used to get servers from server_db,'
81                 'e.g, cautotest. Used only if no SERVER specified.')
82    parser.add_argument('servers', action='store', nargs='*', metavar='SERVER')
83
84    return parser.parse_known_args()
85
86
87def _update_server(server, extra_args=[]):
88    """Run deploy_server_local for given server.
89
90    @param server: hostname to update.
91    @param extra_args: args to be passed in to deploy_server_local.
92
93    @return: A tuple of (server, success, output), where:
94             server: Name of the server.
95             sucess: True if update succeeds, False otherwise.
96             output: A string of the deploy_server_local script output
97                     including any errors.
98    """
99    cmd = ('%s %s' %
100           (DEPLOY_SERVER_LOCAL, ' '.join(extra_args)))
101    success = False
102    try:
103        output = infra.execute_command(server, cmd)
104        success = True
105    except subprocess.CalledProcessError as e:
106        output = e.output
107
108    return server, success, output
109
110def _update_in_parallel(servers, extra_args=[]):
111    """Update a group of servers in parallel.
112
113    @param servers: A list of servers to update.
114    @param options: Options for the push.
115
116    @returns A dictionary from server names that failed to the output
117             of the update script.
118    """
119    # Create a list to record all the finished servers.
120    manager = multiprocessing.Manager()
121    finished_servers = manager.list()
122
123    do_server = lambda s: _update_server(s, extra_args)
124
125    # The update actions run in parallel. If any update failed, we should wait
126    # for other running updates being finished. Abort in the middle of an update
127    # may leave the server in a bad state.
128    pool = ThreadPool(POOL_SIZE)
129    try:
130        results = pool.map_async(do_server, servers)
131        pool.close()
132
133        # Track the updating progress for current group of servers.
134        incomplete_servers = set()
135        server_names = set([s[0] for s in servers])
136        while not results.ready():
137            incomplete_servers = sorted(set(servers) - set(finished_servers))
138            print('Not finished yet. %d servers in this group. '
139                '%d servers are still running:\n%s\n' %
140                (len(servers), len(incomplete_servers), incomplete_servers))
141            # Check the progress every 20s
142            results.wait(20)
143
144        # After update finished, parse the result.
145        failures = {}
146        for server, success, output in results.get():
147            if not success:
148                failures[server] = output
149
150        return failures
151
152    finally:
153        pool.terminate()
154        pool.join()
155
156
157def main(args):
158    """Entry point to deploy_server.py
159
160    @param args: The command line arguments to parse. (usually sys.argv)
161
162    @returns The system exit code.
163    """
164    options, extra_args = _parse_arguments(args[1:])
165    # Remove all the handlers from the root logger to get rid of the handlers
166    # introduced by the import packages.
167    logging.getLogger().handlers = []
168    logging.basicConfig(level=logging.DEBUG)
169
170    servers = options.servers
171    if not servers:
172        if not options.afe:
173            print('No servers or afe specified. Aborting')
174            return 1
175        print('Retrieving servers from %s..' % options.afe)
176        servers = discover_servers(options.afe)
177        print('Retrieved servers were: %s' % servers)
178
179    if not options.x:
180        print('Doing nothing because -x was not supplied.')
181        print('servers: %s' % options.servers)
182        print('extra args for deploy_server_local: %s' % extra_args)
183        return 0
184
185    failures = _update_in_parallel(servers, extra_args)
186
187    if not failures:
188        print('Completed all updates successfully.')
189        return 0
190
191    print('The following servers failed, with the following output:')
192    for s, o in failures.iteritems():
193        print('======== %s ========' % s)
194        print(o)
195
196    print('The servers that failed were:')
197    print('\n'.join(failures.keys()))
198    print('\n\nTo retry on failed servers, run the following command:')
199    retry_cmd = [args[0], '-x'] + failures.keys() + extra_args
200    print(' '.join(retry_cmd))
201    return 1
202
203
204
205if __name__ == '__main__':
206    sys.exit(main(sys.argv))
207