1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A tool to compare functions for finding anomalies to the current default.
6
7This tool provides a way to benchmark an anomaly detection algorithm against
8the current find_change_points (base) by running simulations and comparing the
9results to the base results and to the existing anomalies in the datastore.
10
11Usage:
12  1. Run SetupBaseDataForBench() if not yet.
13
14  2. Add an implementation of find_change_points that takes
15  (test_entity, chart_series) arguments and returns a list of
16  find_change_points.ChangePoint entities.
17  See find_change_points_exp.RunFindChangePoints.
18
19  3. Add that function path to _EXPERIMENTAL_FUNCTIONS with a key name.
20
21  4. Call BenchFindChangePoints(name, description) to add a bench job, where
22  name is one of the keys in _EXPERIMENTAL_FUNCTIONS. Name and description
23  must be unique for each run. The bench results are logged in quick_logger at:
24  chromeperf.appspot.com/get_logs?namespace=bench_find_anomalies&name=report
25
26If you want to clear the base data, you can run DeleteAllTestBenchEntities().
27
28Results:
29  Invalid alerts: Number of change points found by the experimental function
30      which correspond to invalid alerts, over total invalid alerts.
31  Confirmed alerts: Number of change points found by the experimental function
32      which correspond to alerts the sheriff filed a bug for, over the total
33      number of alerts with bug ID.
34  New alerts: Number of alerts found by the experimental function that the base
35      find_change_points algorithm did not find.
36  Total alerts: Total change points found by the experimental function,
37      over the total number of base alerts.
38"""
39
40import logging
41
42from pipeline import common as pipeline_common
43from pipeline import pipeline
44
45from google.appengine.api import app_identity
46from google.appengine.ext import deferred
47from google.appengine.ext import ndb
48
49from dashboard import debug_alert
50from dashboard import find_change_points
51from dashboard import find_change_points_exp
52from dashboard import layered_cache
53from dashboard import quick_logger
54from dashboard import utils
55from dashboard.models import anomaly
56from dashboard.models import anomaly_config
57from dashboard.models import graph_data
58
59_TASK_QUEUE_NAME = 'find-anomalies-bench-queue'
60
61_FIND_ANOMALIES_BENCH_CACHE_KEY = 'find-anomalies-bench'
62
63# Bench name to path of allowable find anomalies function to benchmark.
64
65_EXPERIMENTAL_FUNCTIONS = {
66    'find_change_points_default': find_change_points_exp.RunFindChangePoints,
67    'steppiness_0_3': lambda test, series:
68                      find_change_points_exp.RunFindChangePoints(
69                          test, series, min_steppiness=0.3),
70    'steppiness_0_4': lambda test, series:
71                      find_change_points_exp.RunFindChangePoints(
72                          test, series, min_steppiness=0.4),
73    'steppiness_0_5': lambda test, series:
74                      find_change_points_exp.RunFindChangePoints(
75                          test, series, min_steppiness=0.5),
76    'steppiness_0_6': lambda test, series:
77                      find_change_points_exp.RunFindChangePoints(
78                          test, series, min_steppiness=0.6),
79}
80
81
82_TEST_DATA_SHERIFF = 'Chromium Perf Sheriff'
83
84# 1000 tests and 300 rows take about 3 hours to run SetupBaseDataForBench.
85_NUM_TEST_TO_BENCH = 3000
86
87# 250 rows takes about 5 minutes to run find_change_points per task queue task.
88# (The AE limit is 10 minutes)
89_NUM_ROWS_TO_BENCH = 300
90
91# This is the window size which consists of points before and after the
92# Anomaly. If an Anomaly's end revision overlaps another Anomaly's window,
93# they are considered the same Anomaly.
94_MAX_SEGMENT_SIZE_AROUND_ANOMALY = 4
95
96_REPORT_TEMPLATE = """%(bench_name)s: %(description)s
97 Invalid alerts: %(invalid_alerts)s
98 Confirmed alerts: %(confirmed_alerts)s
99 New alerts: %(new_alerts)s
100 Total alerts: %(total_alerts)s
101
102 "Unconfirmed" alerts, i.e. "valid" alerts that were not found by
103 the experimental function:
104 %(unconfirmed_alert_links)s
105
106 "Extra" alerts, i.e. new alerts found by the experimental function
107 that weren't found before:
108 %(extra_alert_links)s
109"""
110
111
112class TestBench(ndb.Model):
113  """Reference anomaly data for one Test."""
114
115  # Test key.
116  test = ndb.KeyProperty()
117
118  # List of tuples of (x_value, y_value) for test.
119  data_series = ndb.PickleProperty()
120
121  # List of lists of revisions around Anomaly entities from base run.
122  base_anomaly_revs = ndb.PickleProperty()
123
124  # List of lists of revisions around Anomaly entities marked invalid.
125  invalid_anomaly_revs = ndb.PickleProperty()
126
127  # List of lists of revisions around Anomaly entities with bug IDs.
128  confirmed_anomaly_revs = ndb.PickleProperty()
129
130
131class SimulateAlertProcessingPipeline(pipeline.Pipeline):
132
133  def run(self, bench_name, test_bench_id):  # pylint: disable=invalid-name
134    """Runs one experimental alerting function for one TestBench entity.
135
136    Args:
137      bench_name: A string bench name.
138      test_bench_id: Integer ID of a TestBench entity.
139
140    Returns:
141      A pair (TestBench ID, list of Anomaly dicts). But if the Test
142      can't be gotten, this will return (None, None).
143    """
144    all_change_points = []
145    test_bench = TestBench.get_by_id(test_bench_id)
146    test = test_bench.test.get()
147    # If test doesn't exist anymore, just remove this TestBench entity.
148    if not test:
149      test_bench.key.delete()
150      return None, None
151
152    # Clear the last_alerted_function property because it will be used in
153    # the experimental alerting function.
154    test.last_alerted_revision = None
155    data_series = test_bench.data_series
156    for i in xrange(1, len(data_series)):
157      find_change_points_func = _EXPERIMENTAL_FUNCTIONS[bench_name]
158      change_points = find_change_points_func(test, data_series[0:i])
159      change_points = [c for c in change_points if _IsRegression(c, test)]
160      all_change_points.extend(change_points)
161    logging.debug('Completed alert processing simulation task for bench_name: '
162                  '%s, bench_id: %s.', bench_name, test_bench_id)
163    return test_bench_id, all_change_points
164
165
166class GenerateComparisonReportPipeline(pipeline.Pipeline):
167
168  def run(  # pylint: disable=invalid-name
169      self, bench_name, description, simulation_results):
170    """"Generates a comparison report between experimental and base results.
171
172    Args:
173      bench_name: A string bench name.
174      description: A string description of this bench job.
175      simulation_results: A list of pairs, each of which is a pair
176          (TestBench id, change point results), i.e. the return value of
177          SimulateAlertProcessingPipeline.run. But, the ChangePoint objects,
178          which are named tuple objects, are automatically converted to lists
179          because they're implicitly serialized as JSON.
180    """
181    bench_id_to_change_points_as_lists = dict(simulation_results)
182    results = {
183        'bench_name': bench_name,
184        'description': description,
185    }
186    total_invalid_alerts = 0
187    total_confirmed_alerts = 0
188    total_new_alerts = 0
189    total_alerts = 0
190    total_base_alerts = 0
191    total_base_invalid_alerts = 0
192    total_base_confirmed_alerts = 0
193
194    unconfirmed_alert_links = []
195    extra_alert_links = []
196
197    for bench in TestBench.query().fetch():
198      bench_id = bench.key.integer_id()
199      if bench_id not in bench_id_to_change_points_as_lists:
200        continue
201      change_points_as_lists = bench_id_to_change_points_as_lists[bench_id]
202      invalid_anomaly_rev_set = _Flatten(bench.invalid_anomaly_revs)
203      confirmed_anomaly_rev_set = _Flatten(bench.confirmed_anomaly_revs)
204      base_anomaly_rev_set = _Flatten(bench.base_anomaly_revs)
205      unconfirmed_alert_links.extend(
206          _UnconfirmedAlertLinks(bench, change_points_as_lists))
207      extra_alert_links.extend(
208          _ExtraAlertLinks(bench, change_points_as_lists))
209
210      for change_point_as_list in change_points_as_lists:
211        change_point = find_change_points.ChangePoint(*change_point_as_list)
212        end_rev = change_point.x_value
213        if end_rev in invalid_anomaly_rev_set:
214          total_invalid_alerts += 1
215        elif end_rev in confirmed_anomaly_rev_set:
216          total_confirmed_alerts += 1
217        elif end_rev not in base_anomaly_rev_set:
218          total_new_alerts += 1
219
220      total_alerts += len(change_points_as_lists)
221      total_base_alerts += len(bench.base_anomaly_revs)
222      total_base_invalid_alerts += len(bench.invalid_anomaly_revs)
223      total_base_confirmed_alerts += len(bench.confirmed_anomaly_revs)
224
225    results['invalid_alerts'] = (
226        '%s/%s' % (total_invalid_alerts, total_base_invalid_alerts))
227    results['confirmed_alerts'] = (
228        '%s/%s' % (total_confirmed_alerts, total_base_confirmed_alerts))
229    results['new_alerts'] = total_new_alerts
230    results['total_alerts'] = '%s/%s' % (total_alerts, total_base_alerts)
231    results['unconfirmed_alert_links'] = '\n'.join(
232        unconfirmed_alert_links[:10])
233    results['extra_alert_links'] = '\n'.join(
234        extra_alert_links[:10])
235
236    _AddReportToLog(results)
237
238    logging.debug('Completed comparison report for bench_name: %s, '
239                  'description: %s. Results: %s', bench_name, description,
240                  results)
241
242
243def _UnconfirmedAlertLinks(bench, change_points_as_lists):
244  """Makes a list of URLs to view graphs for "unconfirmed" alerts.
245
246  Here, "unconfirmed" alerts refers to alerts that are in the TestBench
247  object (i.e. they were found before and "confirmed") but were not found
248  by the experimental find-anomalies function -- they were not "confirmed"
249  again by the experimental function, so I'm calling them "unconfirmed".
250
251  Below, bench.confirmed_anomaly_revs is a list of lists of revisions *around*
252  a confirmed alert. For example, if an alert was found before at revision
253  200 and 300, this list might look like: [[199, 200, 201], [299, 300, 301]].
254
255  Thus, the set of alerts that were "confirmed alerts" before, but not found
256  by the experimental function is the central revision for each one of these
257  groups where the experimental function didn't find any corresponding alerts.
258
259  Ideally for a good experimental function, we're hoping that these
260  "unconfirmed" alerts are all cases where sheriffs triaged the alert wrong and
261  it was actually invalid.
262
263  Args:
264    bench: One TestBench entity.
265    change_points_as_lists: List of lists (which are JSONified ChangePoints).
266
267  Returns:
268    A list of URLs, each of which is for a graph for one unconfirmed alert.
269  """
270  anomaly_revs = {c[0] for c in change_points_as_lists}
271  unconfirmed_revs = []
272  for confirmed_rev_group in bench.confirmed_anomaly_revs:
273    if not anomaly_revs.intersection(confirmed_rev_group):
274      # The alert for the this confirmed rev group is "unconfirmed" by the
275      # experimental function. It should be added to the list.
276      mid_index = len(confirmed_rev_group) / 2
277      unconfirmed_revs.append(confirmed_rev_group[mid_index])
278  return [_GraphLink(bench.test, rev) for rev in unconfirmed_revs]
279
280
281def _ExtraAlertLinks(bench, change_points_as_lists):
282  """Makes a list of links to view "extra" alerts found.
283
284  Here, an "extra" alert means an alert that was found by the experimental
285  function but doesn't coincide with any Anomaly in the datastore, regardless
286  of whether that Anomaly would be found by the current default alerting
287  function.
288
289  Args:
290    bench: A TestBench entity.
291    change_points_as_lists: List of lists (which are JSONified ChangePoints).
292
293  Returns:
294    A list of URLs, each of which is for a graph for one extra alert.
295  """
296  anomaly_revs = {c[0] for c in change_points_as_lists}
297  confirmed_revs = _Flatten(bench.confirmed_anomaly_revs)
298  invalid_revs = _Flatten(bench.invalid_anomaly_revs)
299  # Both "confirmed revs" and "invalid revs" are previously fired alerts.
300  extra_revs = anomaly_revs.difference(confirmed_revs, invalid_revs)
301  return [_GraphLink(bench.test, rev) for rev in extra_revs]
302
303
304def _GraphLink(test_key, rev):
305  """Returns an HTML link to view the graph for an alert."""
306  test_path = utils.TestPath(test_key)
307  master, bot, test = test_path.split('/', 2)
308  query = '?masters=%s&bots=%s&tests=%s&rev=%s' % (master, bot, test, rev)
309  return '<a href="https://%s/report%s">%s/%s@%s</a>' % (
310      app_identity.get_default_version_hostname(), query, bot, test, rev)
311
312
313class RunExperimentalChunkPipeline(pipeline.Pipeline):
314
315  def run(self, bench_name, test_bench_ids):  # pylint: disable=invalid-name
316    """Runs the experimental find_change_points on each TestBench entity.
317
318    This runs SimulateAlertProcessing in parallel and returns a list of
319    the combined results.
320
321    Args:
322      bench_name: A string bench name.
323      test_bench_ids: List of TestBench IDs.
324
325    Yields:
326      Pipeline instance.
327    """
328    results = []
329    for bench_id in test_bench_ids:
330      result_future = yield SimulateAlertProcessingPipeline(
331          bench_name, bench_id)
332      results.append(result_future)
333    yield pipeline_common.List(*results)
334
335
336class RunExperimentalPipeline(pipeline.Pipeline):
337
338  def run(self, bench_name, description):  # pylint: disable=invalid-name
339    """The root pipeline that start simulation tasks and generating report.
340
341    This spawns tasks to spawn more tasks that run simulation and executes the
342    generate report task on the aggregated the results.
343
344    Args:
345      bench_name: A string bench name.
346      description: A string description of this bench job.
347
348    Yields:
349      Pipeline instance.
350    """
351    test_bench_keys = TestBench.query().fetch(keys_only=True)
352    test_bench_ids = [k.integer_id() for k in test_bench_keys]
353
354    results = []
355    # Size of number of taskqueue tasks we want to spawn per pipeline.
356    pipeline_chunk_size = 1000
357    for i in xrange(0, len(test_bench_ids), pipeline_chunk_size):
358      id_chunk = test_bench_ids[i:i + pipeline_chunk_size]
359      result_future = yield RunExperimentalChunkPipeline(
360          bench_name, id_chunk)
361      results.append(result_future)
362
363    combined_results = yield pipeline_common.Extend(*results)
364    yield GenerateComparisonReportPipeline(
365        bench_name, description, combined_results)
366
367
368def SetupBaseDataForBench():
369  """Adds tasks to queue to create base data for bench."""
370  if TestBench.query().fetch(keys_only=True, limit=1):
371    raise Exception('Base data already exist.')
372
373  # This will take a while, so we do it in a task queue.
374  deferred.defer(_AddCreateTestBenchTasks, _queue=_TASK_QUEUE_NAME)
375
376
377def BenchFindChangePoints(bench_name, description):
378  """Submits a bench job for a bench_name and description.
379
380  Requires an implementation of find_change_points added to
381  _EXPERIMENTAL_FUNCTIONS. At least bench_name or description must
382  be different for each job.
383
384  Args:
385    bench_name: A string bench name which should exist in they keys of
386        _EXPERIMENTAL_FUNCTIONS.
387    description: A string description of this bench job.
388
389  Raises:
390    ValueError: The input was not valid.
391    Exception: Not enough data available.
392  """
393  if bench_name not in _EXPERIMENTAL_FUNCTIONS:
394    raise ValueError('%s is not a valid find anomalies bench function.' %
395                     bench_name)
396
397  bench_key = '%s.%s' % (bench_name, description)
398  submitted_benches = layered_cache.Get(_FIND_ANOMALIES_BENCH_CACHE_KEY)
399  if not submitted_benches:
400    submitted_benches = {}
401  if bench_key in submitted_benches:
402    raise ValueError('Bench job for "%s. %s" already in submitted.' %
403                     (bench_name, description))
404
405  submitted_benches[bench_key] = True
406  layered_cache.Set(_FIND_ANOMALIES_BENCH_CACHE_KEY, submitted_benches)
407
408  # Check if base bench data exist.
409  if not TestBench.query().fetch(keys_only=True, limit=1):
410    raise Exception('No base data available to bench against.')
411
412  # Add to taskqueue to run simulation.
413  stage = RunExperimentalPipeline(bench_name, description)
414  stage.start(queue_name=_TASK_QUEUE_NAME)
415
416
417def DeleteAllTestBenchEntities():
418  """Deletes all TestBench data."""
419  ndb.delete_multi(TestBench.query().fetch(keys_only=True))
420
421
422def _AddCreateTestBenchTasks():
423  """Adds _CreateTestBench tasks to queue."""
424  sheriff_key = ndb.Key('Sheriff', _TEST_DATA_SHERIFF)
425  query = graph_data.Test.query(
426      graph_data.Test.sheriff == sheriff_key,
427      graph_data.Test.has_rows == True,
428      graph_data.Test.deprecated == False)
429
430  tests = query.fetch(limit=_NUM_TEST_TO_BENCH)
431
432  tests = [t for t in tests if _GetSheriffForTest(t) and not _IsRefBuild(t)]
433  for test in tests:
434    deferred.defer(_CreateTestBench, test.key, _queue=_TASK_QUEUE_NAME)
435
436
437def _CreateTestBench(test_key):
438  """Fetches and stores test and row data that would be used to run bench."""
439  # Get rows entity.
440  query = graph_data.Row.query(projection=['revision', 'value'])
441  query = query.filter(graph_data.Row.parent_test == test_key)
442  query = query.order(-graph_data.Row.revision)
443  rows = list(reversed(query.fetch(limit=_NUM_ROWS_TO_BENCH)))
444  data_series = [(row.revision, row.value) for row in rows]
445
446  # Add TestBench entity.
447  test_bench = TestBench(test=test_key, data_series=data_series)
448  _UpdateInvalidAndConfirmedAnomalyRevs(test_bench)
449  _RunBaseAlertProcessing(test_bench)
450  test_bench.put()
451
452
453def _AddReportToLog(report_dict):
454  """Adds a log for bench results."""
455  report = _REPORT_TEMPLATE % report_dict
456  formatter = quick_logger.Formatter()
457  logger = quick_logger.QuickLogger(
458      'bench_find_anomalies', 'report', formatter)
459  logger.Log(report)
460  logger.Save()
461
462
463def _Flatten(list_of_list):
464  """Creates set of all items in the sublists."""
465  flattened = set()
466  for item in list_of_list:
467    flattened.update(item)
468  return flattened
469
470
471def _UpdateInvalidAndConfirmedAnomalyRevs(test_bench):
472  """Updates TestBench entity with invalid and confirmed anomalies revs."""
473
474  # Start rev for getting Anomalies should be at min_segment_size.
475  test = test_bench.test.get()
476  config_dict = anomaly_config.GetAnomalyConfigDict(test)
477  min_segment_size = config_dict.get(
478      'min_segment_size', find_change_points.MIN_SEGMENT_SIZE)
479  start_index = min(min_segment_size, len(test_bench.data_series)) - 1
480  start_rev = test_bench.data_series[start_index][0]
481
482  query = anomaly.Anomaly.query(anomaly.Anomaly.test == test_bench.test)
483  anomalies = query.fetch()
484  anomalies.sort(key=lambda a: a.end_revision)
485  anomalies = [a for a in anomalies if a.end_revision >= start_rev and
486               not a.is_improvement]
487
488  test_bench.invalid_anomaly_revs = [
489      _GetRevsAroundRev(test_bench.data_series, a.end_revision)
490      for a in anomalies if a.bug_id == -1]
491  test_bench.confirmed_anomaly_revs = [
492      _GetRevsAroundRev(test_bench.data_series, a.end_revision)
493      for a in anomalies if a.bug_id > 0]
494
495
496def _RunBaseAlertProcessing(test_bench):
497  """Runs base alert processing simulation on TestBench entity.
498
499  This function runs the current find_change_points.FindChangePoints
500  implementation and saves the revisions around the found anomalies to
501  a TestBench entity.
502
503  Args:
504    test_bench: A TestBench entity.
505  """
506  test = test_bench.test.get()
507  config_dict = anomaly_config.GetAnomalyConfigDict(test)
508  change_points = debug_alert.SimulateAlertProcessing(
509      test_bench.data_series, **config_dict)
510
511  test_bench.base_anomaly_revs = [
512      _GetRevsAroundRev(test_bench.data_series, change_point.x_value)
513      for change_point in change_points if _IsRegression(change_point, test)]
514
515
516def _GetRevsAroundRev(data_series, revision):
517  """Gets a list of revisions from before to after a given revision.
518
519  Args:
520    data_series: A list of (revision, value).
521    revision: A revision number.
522
523  Returns:
524    A list of revisions.
525  """
526  if not _MAX_SEGMENT_SIZE_AROUND_ANOMALY:
527    return [revision]
528
529  middle_index = 0
530  for i in xrange(len(data_series)):
531    if data_series[i][0] == revision:
532      middle_index = i
533      break
534  start_index = max(0, middle_index - _MAX_SEGMENT_SIZE_AROUND_ANOMALY)
535  end_index = middle_index + _MAX_SEGMENT_SIZE_AROUND_ANOMALY + 1
536  series_around_rev = data_series[start_index:end_index]
537  return [s[0] for s in series_around_rev]
538
539
540def _IsRefBuild(test):
541  """Returns True if test is a reference build."""
542  key_path = test.key.string_id()
543  return key_path[-1] == 'ref' or key_path[-1].endswith('_ref')
544
545
546def _GetSheriffForTest(test):
547  """Gets the Sheriff for a test, or None if no sheriff."""
548  if test.sheriff:
549    return test.sheriff.get()
550  return None
551
552
553def _IsRegression(change_point, test):
554  """Returns whether the alert is a regression for the given test.
555
556  Args:
557    change_point: A find_change_points.ChangePoint object.
558    test: Test to get the regression direction for.
559
560  Returns:
561    True if it is a regression anomaly, otherwise False.
562  """
563  median_before = change_point.median_before
564  median_after = change_point.median_after
565  if (median_before < median_after and
566      test.improvement_direction == anomaly.UP):
567    return False
568  if (median_before >= median_after and
569      test.improvement_direction == anomaly.DOWN):
570    return False
571  return True
572