1"""A simple script to backfill tko_task_references table with throttling.""" 2 3from __future__ import absolute_import 4from __future__ import division 5from __future__ import print_function 6 7import argparse 8import collections 9import contextlib 10import logging 11import time 12 13import MySQLdb 14 15 16class BackfillException(Exception): 17 pass 18 19 20def _parse_args(): 21 parser = argparse.ArgumentParser( 22 description=__doc__) 23 parser.add_argument('--host', required=True, help='mysql server host') 24 parser.add_argument('--user', required=True, help='mysql server user') 25 parser.add_argument('--password', required=True, help='mysql server password') 26 parser.add_argument('--dryrun', action='store_true', default=False) 27 parser.add_argument( 28 '--num-iterations', 29 default=None, 30 type=int, 31 help='If set, total number of iterations. Default is no limit.', 32 ) 33 parser.add_argument( 34 '--batch-size', 35 default=1000, 36 help='Number of tko_jobs rows to read in one iteration', 37 ) 38 parser.add_argument( 39 '--sleep-seconds', 40 type=int, 41 default=1, 42 help='Time to sleep between iterations', 43 ) 44 45 args = parser.parse_args() 46 if args.dryrun: 47 if not args.num_iterations: 48 logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.') 49 args.num_iterations = 5 50 return args 51 52 53 54@contextlib.contextmanager 55def _mysql_connection(args): 56 conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password) 57 with _mysql_cursor(conn) as c: 58 c.execute('USE chromeos_autotest_db;') 59 try: 60 yield conn 61 finally: 62 conn.close() 63 64 65@contextlib.contextmanager 66def _autocommit(conn): 67 try: 68 yield conn 69 except: 70 conn.rollback() 71 else: 72 conn.commit() 73 74 75@contextlib.contextmanager 76def _mysql_cursor(conn): 77 c = conn.cursor() 78 try: 79 yield c 80 finally: 81 c.close() 82 83 84def _latest_unfilled_job_idx(conn): 85 with _mysql_cursor(conn) as c: 86 c.execute(""" 87SELECT tko_job_idx 88FROM tko_task_references 89ORDER BY tko_job_idx 90LIMIT 1 91;""") 92 r = c.fetchall() 93 if r: 94 return str(long(r[0][0]) - 1) 95 logging.debug('tko_task_references is empty.' 96 ' Grabbing the latest tko_job_idx to fill.') 97 with _mysql_cursor(conn) as c: 98 c.execute(""" 99SELECT job_idx 100FROM tko_jobs 101ORDER BY job_idx DESC 102LIMIT 1 103;""") 104 r = c.fetchall() 105 if r: 106 return r[0][0] 107 return None 108 109 110_TKOTaskReference = collections.namedtuple( 111 '_TKOTaskReference', 112 ['tko_job_idx', 'task_reference', 'parent_task_reference'], 113) 114 115_SQL_SELECT_TASK_REFERENCES = """ 116SELECT job_idx, afe_job_id, afe_parent_job_id 117FROM tko_jobs 118WHERE job_idx <= %(latest_job_idx)s 119ORDER BY job_idx DESC 120LIMIT %(batch_size)s 121;""" 122_SQL_INSERT_TASK_REFERENCES = """ 123INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id) 124VALUES %(values)s 125;""" 126_SQL_SELECT_TASK_REFERENCE = """ 127SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s 128;""" 129 130 131def _compute_task_references(conn, latest_job_idx, batch_size): 132 with _mysql_cursor(conn) as c: 133 sql = _SQL_SELECT_TASK_REFERENCES % { 134 'latest_job_idx': latest_job_idx, 135 'batch_size': batch_size, 136 } 137 c.execute(sql) 138 rs = c.fetchall() 139 if rs is None: 140 return [] 141 142 return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs] 143 144 145def _insert_task_references(conn, task_references, dryrun): 146 values = ', '.join([ 147 '("afe", %s, "%s", "%s")' % 148 (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference) 149 for tr in task_references 150 ]) 151 sql = _SQL_INSERT_TASK_REFERENCES % {'values': values} 152 if dryrun: 153 if len(sql) < 200: 154 sql_log = sql 155 else: 156 sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:]) 157 logging.debug('Would have run: %s', sql_log) 158 with _autocommit(conn) as conn: 159 with _mysql_cursor(conn) as c: 160 c.execute(sql) 161 162 163def _verify_task_references(conn, task_references): 164 # Just verify that the last one was inserted. 165 if not task_references: 166 return 167 tko_job_idx = task_references[-1].tko_job_idx 168 sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx} 169 with _mysql_cursor(conn) as c: 170 c.execute(sql) 171 r = c.fetchall() 172 if not r or r[0][0] != tko_job_idx: 173 raise BackfillException( 174 'Failed to insert task reference for tko_job_id %s' % tko_job_idx) 175 176 177def _next_job_idx(task_references): 178 return str(long(task_references[-1].tko_job_idx) - 1) 179 180def main(): 181 logging.basicConfig(level=logging.DEBUG) 182 args = _parse_args() 183 with _mysql_connection(args) as conn: 184 tko_job_idx = _latest_unfilled_job_idx(conn) 185 if tko_job_idx is None: 186 raise BackfillException('Failed to get last unfilled tko_job_idx') 187 logging.info('First tko_job_idx to fill: %s', tko_job_idx) 188 189 while True: 190 logging.info('####################################') 191 logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx) 192 193 task_references = () 194 with _mysql_connection(args) as conn: 195 task_references = _compute_task_references( 196 conn, tko_job_idx, args.batch_size) 197 if not task_references: 198 logging.info('No more unfilled task references. All done!') 199 break 200 201 logging.info( 202 'Inserting %d task references. tko_job_ids: %d...%d', 203 len(task_references), 204 task_references[0].tko_job_idx, 205 task_references[-1].tko_job_idx, 206 ) 207 with _mysql_connection(args) as conn: 208 _insert_task_references(conn, task_references, args.dryrun) 209 if not args.dryrun: 210 with _mysql_connection(args) as conn: 211 _verify_task_references(conn, task_references) 212 213 tko_job_idx = _next_job_idx(task_references) 214 215 if args.num_iterations is not None: 216 args.num_iterations -= 1 217 if args.num_iterations <= 0: 218 break 219 logging.info('%d more iterations left', args.num_iterations) 220 logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds) 221 time.sleep(args.sleep_seconds) 222 223 224if __name__ == '__main__': 225 main() 226