1"""A simple script to backfill tko_task_references table with throttling."""
2
3from __future__ import absolute_import
4from __future__ import division
5from __future__ import print_function
6
7import argparse
8import collections
9import contextlib
10import logging
11import time
12
13import MySQLdb
14
15
16class BackfillException(Exception):
17  pass
18
19
20def _parse_args():
21  parser = argparse.ArgumentParser(
22      description=__doc__)
23  parser.add_argument('--host', required=True, help='mysql server host')
24  parser.add_argument('--user', required=True, help='mysql server user')
25  parser.add_argument('--password', required=True, help='mysql server password')
26  parser.add_argument('--dryrun', action='store_true', default=False)
27  parser.add_argument(
28      '--num-iterations',
29      default=None,
30      type=int,
31      help='If set, total number of iterations. Default is no limit.',
32  )
33  parser.add_argument(
34      '--batch-size',
35      default=1000,
36      help='Number of tko_jobs rows to read in one iteration',
37  )
38  parser.add_argument(
39      '--sleep-seconds',
40      type=int,
41      default=1,
42      help='Time to sleep between iterations',
43  )
44
45  args = parser.parse_args()
46  if args.dryrun:
47    if not args.num_iterations:
48      logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
49      args.num_iterations = 5
50  return args
51
52
53
54@contextlib.contextmanager
55def _mysql_connection(args):
56  conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
57  with _mysql_cursor(conn) as c:
58    c.execute('USE chromeos_autotest_db;')
59  try:
60    yield conn
61  finally:
62    conn.close()
63
64
65@contextlib.contextmanager
66def _autocommit(conn):
67  try:
68    yield conn
69  except:
70    conn.rollback()
71  else:
72    conn.commit()
73
74
75@contextlib.contextmanager
76def _mysql_cursor(conn):
77  c = conn.cursor()
78  try:
79    yield c
80  finally:
81    c.close()
82
83
84def _latest_unfilled_job_idx(conn):
85  with _mysql_cursor(conn) as c:
86    c.execute("""
87SELECT tko_job_idx
88FROM tko_task_references
89ORDER BY tko_job_idx
90LIMIT 1
91;""")
92    r = c.fetchall()
93    if r:
94      return str(long(r[0][0]) - 1)
95  logging.debug('tko_task_references is empty.'
96               ' Grabbing the latest tko_job_idx to fill.')
97  with _mysql_cursor(conn) as c:
98    c.execute("""
99SELECT job_idx
100FROM tko_jobs
101ORDER BY job_idx DESC
102LIMIT 1
103;""")
104    r = c.fetchall()
105    if r:
106      return r[0][0]
107  return None
108
109
110_TKOTaskReference = collections.namedtuple(
111    '_TKOTaskReference',
112    ['tko_job_idx', 'task_reference', 'parent_task_reference'],
113)
114
115_SQL_SELECT_TASK_REFERENCES = """
116SELECT job_idx, afe_job_id, afe_parent_job_id
117FROM tko_jobs
118WHERE job_idx <= %(latest_job_idx)s
119ORDER BY job_idx DESC
120LIMIT %(batch_size)s
121;"""
122_SQL_INSERT_TASK_REFERENCES = """
123INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
124VALUES %(values)s
125;"""
126_SQL_SELECT_TASK_REFERENCE = """
127SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
128;"""
129
130
131def _compute_task_references(conn, latest_job_idx, batch_size):
132  with _mysql_cursor(conn) as c:
133    sql = _SQL_SELECT_TASK_REFERENCES % {
134        'latest_job_idx': latest_job_idx,
135        'batch_size': batch_size,
136    }
137    c.execute(sql)
138    rs = c.fetchall()
139    if rs is None:
140      return []
141
142    return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]
143
144
145def _insert_task_references(conn, task_references, dryrun):
146  values = ', '.join([
147      '("afe", %s, "%s", "%s")' %
148      (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
149      for tr in task_references
150  ])
151  sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
152  if dryrun:
153    if len(sql) < 200:
154      sql_log = sql
155    else:
156      sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
157    logging.debug('Would have run: %s', sql_log)
158  with _autocommit(conn) as conn:
159    with _mysql_cursor(conn) as c:
160      c.execute(sql)
161
162
163def _verify_task_references(conn, task_references):
164  # Just verify that the last one was inserted.
165  if not task_references:
166    return
167  tko_job_idx = task_references[-1].tko_job_idx
168  sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
169  with _mysql_cursor(conn) as c:
170    c.execute(sql)
171    r = c.fetchall()
172    if not r or r[0][0] != tko_job_idx:
173      raise BackfillException(
174          'Failed to insert task reference for tko_job_id %s' % tko_job_idx)
175
176
177def _next_job_idx(task_references):
178  return str(long(task_references[-1].tko_job_idx) - 1)
179
180def main():
181  logging.basicConfig(level=logging.DEBUG)
182  args = _parse_args()
183  with _mysql_connection(args) as conn:
184    tko_job_idx = _latest_unfilled_job_idx(conn)
185    if tko_job_idx is None:
186      raise BackfillException('Failed to get last unfilled tko_job_idx')
187    logging.info('First tko_job_idx to fill: %s', tko_job_idx)
188
189  while True:
190    logging.info('####################################')
191    logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)
192
193    task_references = ()
194    with _mysql_connection(args) as conn:
195      task_references = _compute_task_references(
196          conn, tko_job_idx, args.batch_size)
197    if not task_references:
198      logging.info('No more unfilled task references. All done!')
199      break
200
201    logging.info(
202        'Inserting %d task references. tko_job_ids: %d...%d',
203        len(task_references),
204        task_references[0].tko_job_idx,
205        task_references[-1].tko_job_idx,
206    )
207    with _mysql_connection(args) as conn:
208      _insert_task_references(conn, task_references, args.dryrun)
209    if not args.dryrun:
210      with _mysql_connection(args) as conn:
211        _verify_task_references(conn, task_references)
212
213    tko_job_idx = _next_job_idx(task_references)
214
215    if args.num_iterations is not None:
216      args.num_iterations -= 1
217      if args.num_iterations <= 0:
218        break
219      logging.info('%d more iterations left', args.num_iterations)
220    logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
221    time.sleep(args.sleep_seconds)
222
223
224if __name__ == '__main__':
225  main()
226