1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A tool to compare functions for finding anomalies to the current default. 6 7This tool provides a way to benchmark an anomaly detection algorithm against 8the current find_change_points (base) by running simulations and comparing the 9results to the base results and to the existing anomalies in the datastore. 10 11Usage: 12 1. Run SetupBaseDataForBench() if not yet. 13 14 2. Add an implementation of find_change_points that takes 15 (test_entity, chart_series) arguments and returns a list of 16 find_change_points.ChangePoint entities. 17 See find_change_points_exp.RunFindChangePoints. 18 19 3. Add that function path to _EXPERIMENTAL_FUNCTIONS with a key name. 20 21 4. Call BenchFindChangePoints(name, description) to add a bench job, where 22 name is one of the keys in _EXPERIMENTAL_FUNCTIONS. Name and description 23 must be unique for each run. The bench results are logged in quick_logger at: 24 chromeperf.appspot.com/get_logs?namespace=bench_find_anomalies&name=report 25 26If you want to clear the base data, you can run DeleteAllTestBenchEntities(). 27 28Results: 29 Invalid alerts: Number of change points found by the experimental function 30 which correspond to invalid alerts, over total invalid alerts. 31 Confirmed alerts: Number of change points found by the experimental function 32 which correspond to alerts the sheriff filed a bug for, over the total 33 number of alerts with bug ID. 34 New alerts: Number of alerts found by the experimental function that the base 35 find_change_points algorithm did not find. 36 Total alerts: Total change points found by the experimental function, 37 over the total number of base alerts. 38""" 39 40import logging 41 42from pipeline import common as pipeline_common 43from pipeline import pipeline 44 45from google.appengine.api import app_identity 46from google.appengine.ext import deferred 47from google.appengine.ext import ndb 48 49from dashboard import debug_alert 50from dashboard import find_change_points 51from dashboard import find_change_points_exp 52from dashboard import layered_cache 53from dashboard import quick_logger 54from dashboard import utils 55from dashboard.models import anomaly 56from dashboard.models import anomaly_config 57from dashboard.models import graph_data 58 59_TASK_QUEUE_NAME = 'find-anomalies-bench-queue' 60 61_FIND_ANOMALIES_BENCH_CACHE_KEY = 'find-anomalies-bench' 62 63# Bench name to path of allowable find anomalies function to benchmark. 64 65_EXPERIMENTAL_FUNCTIONS = { 66 'find_change_points_default': find_change_points_exp.RunFindChangePoints, 67 'steppiness_0_3': lambda test, series: 68 find_change_points_exp.RunFindChangePoints( 69 test, series, min_steppiness=0.3), 70 'steppiness_0_4': lambda test, series: 71 find_change_points_exp.RunFindChangePoints( 72 test, series, min_steppiness=0.4), 73 'steppiness_0_5': lambda test, series: 74 find_change_points_exp.RunFindChangePoints( 75 test, series, min_steppiness=0.5), 76 'steppiness_0_6': lambda test, series: 77 find_change_points_exp.RunFindChangePoints( 78 test, series, min_steppiness=0.6), 79} 80 81 82_TEST_DATA_SHERIFF = 'Chromium Perf Sheriff' 83 84# 1000 tests and 300 rows take about 3 hours to run SetupBaseDataForBench. 85_NUM_TEST_TO_BENCH = 3000 86 87# 250 rows takes about 5 minutes to run find_change_points per task queue task. 88# (The AE limit is 10 minutes) 89_NUM_ROWS_TO_BENCH = 300 90 91# This is the window size which consists of points before and after the 92# Anomaly. If an Anomaly's end revision overlaps another Anomaly's window, 93# they are considered the same Anomaly. 94_MAX_SEGMENT_SIZE_AROUND_ANOMALY = 4 95 96_REPORT_TEMPLATE = """%(bench_name)s: %(description)s 97 Invalid alerts: %(invalid_alerts)s 98 Confirmed alerts: %(confirmed_alerts)s 99 New alerts: %(new_alerts)s 100 Total alerts: %(total_alerts)s 101 102 "Unconfirmed" alerts, i.e. "valid" alerts that were not found by 103 the experimental function: 104 %(unconfirmed_alert_links)s 105 106 "Extra" alerts, i.e. new alerts found by the experimental function 107 that weren't found before: 108 %(extra_alert_links)s 109""" 110 111 112class TestBench(ndb.Model): 113 """Reference anomaly data for one Test.""" 114 115 # Test key. 116 test = ndb.KeyProperty() 117 118 # List of tuples of (x_value, y_value) for test. 119 data_series = ndb.PickleProperty() 120 121 # List of lists of revisions around Anomaly entities from base run. 122 base_anomaly_revs = ndb.PickleProperty() 123 124 # List of lists of revisions around Anomaly entities marked invalid. 125 invalid_anomaly_revs = ndb.PickleProperty() 126 127 # List of lists of revisions around Anomaly entities with bug IDs. 128 confirmed_anomaly_revs = ndb.PickleProperty() 129 130 131class SimulateAlertProcessingPipeline(pipeline.Pipeline): 132 133 def run(self, bench_name, test_bench_id): # pylint: disable=invalid-name 134 """Runs one experimental alerting function for one TestBench entity. 135 136 Args: 137 bench_name: A string bench name. 138 test_bench_id: Integer ID of a TestBench entity. 139 140 Returns: 141 A pair (TestBench ID, list of Anomaly dicts). But if the Test 142 can't be gotten, this will return (None, None). 143 """ 144 all_change_points = [] 145 test_bench = TestBench.get_by_id(test_bench_id) 146 test = test_bench.test.get() 147 # If test doesn't exist anymore, just remove this TestBench entity. 148 if not test: 149 test_bench.key.delete() 150 return None, None 151 152 # Clear the last_alerted_function property because it will be used in 153 # the experimental alerting function. 154 test.last_alerted_revision = None 155 data_series = test_bench.data_series 156 for i in xrange(1, len(data_series)): 157 find_change_points_func = _EXPERIMENTAL_FUNCTIONS[bench_name] 158 change_points = find_change_points_func(test, data_series[0:i]) 159 change_points = [c for c in change_points if _IsRegression(c, test)] 160 all_change_points.extend(change_points) 161 logging.debug('Completed alert processing simulation task for bench_name: ' 162 '%s, bench_id: %s.', bench_name, test_bench_id) 163 return test_bench_id, all_change_points 164 165 166class GenerateComparisonReportPipeline(pipeline.Pipeline): 167 168 def run( # pylint: disable=invalid-name 169 self, bench_name, description, simulation_results): 170 """"Generates a comparison report between experimental and base results. 171 172 Args: 173 bench_name: A string bench name. 174 description: A string description of this bench job. 175 simulation_results: A list of pairs, each of which is a pair 176 (TestBench id, change point results), i.e. the return value of 177 SimulateAlertProcessingPipeline.run. But, the ChangePoint objects, 178 which are named tuple objects, are automatically converted to lists 179 because they're implicitly serialized as JSON. 180 """ 181 bench_id_to_change_points_as_lists = dict(simulation_results) 182 results = { 183 'bench_name': bench_name, 184 'description': description, 185 } 186 total_invalid_alerts = 0 187 total_confirmed_alerts = 0 188 total_new_alerts = 0 189 total_alerts = 0 190 total_base_alerts = 0 191 total_base_invalid_alerts = 0 192 total_base_confirmed_alerts = 0 193 194 unconfirmed_alert_links = [] 195 extra_alert_links = [] 196 197 for bench in TestBench.query().fetch(): 198 bench_id = bench.key.integer_id() 199 if bench_id not in bench_id_to_change_points_as_lists: 200 continue 201 change_points_as_lists = bench_id_to_change_points_as_lists[bench_id] 202 invalid_anomaly_rev_set = _Flatten(bench.invalid_anomaly_revs) 203 confirmed_anomaly_rev_set = _Flatten(bench.confirmed_anomaly_revs) 204 base_anomaly_rev_set = _Flatten(bench.base_anomaly_revs) 205 unconfirmed_alert_links.extend( 206 _UnconfirmedAlertLinks(bench, change_points_as_lists)) 207 extra_alert_links.extend( 208 _ExtraAlertLinks(bench, change_points_as_lists)) 209 210 for change_point_as_list in change_points_as_lists: 211 change_point = find_change_points.ChangePoint(*change_point_as_list) 212 end_rev = change_point.x_value 213 if end_rev in invalid_anomaly_rev_set: 214 total_invalid_alerts += 1 215 elif end_rev in confirmed_anomaly_rev_set: 216 total_confirmed_alerts += 1 217 elif end_rev not in base_anomaly_rev_set: 218 total_new_alerts += 1 219 220 total_alerts += len(change_points_as_lists) 221 total_base_alerts += len(bench.base_anomaly_revs) 222 total_base_invalid_alerts += len(bench.invalid_anomaly_revs) 223 total_base_confirmed_alerts += len(bench.confirmed_anomaly_revs) 224 225 results['invalid_alerts'] = ( 226 '%s/%s' % (total_invalid_alerts, total_base_invalid_alerts)) 227 results['confirmed_alerts'] = ( 228 '%s/%s' % (total_confirmed_alerts, total_base_confirmed_alerts)) 229 results['new_alerts'] = total_new_alerts 230 results['total_alerts'] = '%s/%s' % (total_alerts, total_base_alerts) 231 results['unconfirmed_alert_links'] = '\n'.join( 232 unconfirmed_alert_links[:10]) 233 results['extra_alert_links'] = '\n'.join( 234 extra_alert_links[:10]) 235 236 _AddReportToLog(results) 237 238 logging.debug('Completed comparison report for bench_name: %s, ' 239 'description: %s. Results: %s', bench_name, description, 240 results) 241 242 243def _UnconfirmedAlertLinks(bench, change_points_as_lists): 244 """Makes a list of URLs to view graphs for "unconfirmed" alerts. 245 246 Here, "unconfirmed" alerts refers to alerts that are in the TestBench 247 object (i.e. they were found before and "confirmed") but were not found 248 by the experimental find-anomalies function -- they were not "confirmed" 249 again by the experimental function, so I'm calling them "unconfirmed". 250 251 Below, bench.confirmed_anomaly_revs is a list of lists of revisions *around* 252 a confirmed alert. For example, if an alert was found before at revision 253 200 and 300, this list might look like: [[199, 200, 201], [299, 300, 301]]. 254 255 Thus, the set of alerts that were "confirmed alerts" before, but not found 256 by the experimental function is the central revision for each one of these 257 groups where the experimental function didn't find any corresponding alerts. 258 259 Ideally for a good experimental function, we're hoping that these 260 "unconfirmed" alerts are all cases where sheriffs triaged the alert wrong and 261 it was actually invalid. 262 263 Args: 264 bench: One TestBench entity. 265 change_points_as_lists: List of lists (which are JSONified ChangePoints). 266 267 Returns: 268 A list of URLs, each of which is for a graph for one unconfirmed alert. 269 """ 270 anomaly_revs = {c[0] for c in change_points_as_lists} 271 unconfirmed_revs = [] 272 for confirmed_rev_group in bench.confirmed_anomaly_revs: 273 if not anomaly_revs.intersection(confirmed_rev_group): 274 # The alert for the this confirmed rev group is "unconfirmed" by the 275 # experimental function. It should be added to the list. 276 mid_index = len(confirmed_rev_group) / 2 277 unconfirmed_revs.append(confirmed_rev_group[mid_index]) 278 return [_GraphLink(bench.test, rev) for rev in unconfirmed_revs] 279 280 281def _ExtraAlertLinks(bench, change_points_as_lists): 282 """Makes a list of links to view "extra" alerts found. 283 284 Here, an "extra" alert means an alert that was found by the experimental 285 function but doesn't coincide with any Anomaly in the datastore, regardless 286 of whether that Anomaly would be found by the current default alerting 287 function. 288 289 Args: 290 bench: A TestBench entity. 291 change_points_as_lists: List of lists (which are JSONified ChangePoints). 292 293 Returns: 294 A list of URLs, each of which is for a graph for one extra alert. 295 """ 296 anomaly_revs = {c[0] for c in change_points_as_lists} 297 confirmed_revs = _Flatten(bench.confirmed_anomaly_revs) 298 invalid_revs = _Flatten(bench.invalid_anomaly_revs) 299 # Both "confirmed revs" and "invalid revs" are previously fired alerts. 300 extra_revs = anomaly_revs.difference(confirmed_revs, invalid_revs) 301 return [_GraphLink(bench.test, rev) for rev in extra_revs] 302 303 304def _GraphLink(test_key, rev): 305 """Returns an HTML link to view the graph for an alert.""" 306 test_path = utils.TestPath(test_key) 307 master, bot, test = test_path.split('/', 2) 308 query = '?masters=%s&bots=%s&tests=%s&rev=%s' % (master, bot, test, rev) 309 return '<a href="https://%s/report%s">%s/%s@%s</a>' % ( 310 app_identity.get_default_version_hostname(), query, bot, test, rev) 311 312 313class RunExperimentalChunkPipeline(pipeline.Pipeline): 314 315 def run(self, bench_name, test_bench_ids): # pylint: disable=invalid-name 316 """Runs the experimental find_change_points on each TestBench entity. 317 318 This runs SimulateAlertProcessing in parallel and returns a list of 319 the combined results. 320 321 Args: 322 bench_name: A string bench name. 323 test_bench_ids: List of TestBench IDs. 324 325 Yields: 326 Pipeline instance. 327 """ 328 results = [] 329 for bench_id in test_bench_ids: 330 result_future = yield SimulateAlertProcessingPipeline( 331 bench_name, bench_id) 332 results.append(result_future) 333 yield pipeline_common.List(*results) 334 335 336class RunExperimentalPipeline(pipeline.Pipeline): 337 338 def run(self, bench_name, description): # pylint: disable=invalid-name 339 """The root pipeline that start simulation tasks and generating report. 340 341 This spawns tasks to spawn more tasks that run simulation and executes the 342 generate report task on the aggregated the results. 343 344 Args: 345 bench_name: A string bench name. 346 description: A string description of this bench job. 347 348 Yields: 349 Pipeline instance. 350 """ 351 test_bench_keys = TestBench.query().fetch(keys_only=True) 352 test_bench_ids = [k.integer_id() for k in test_bench_keys] 353 354 results = [] 355 # Size of number of taskqueue tasks we want to spawn per pipeline. 356 pipeline_chunk_size = 1000 357 for i in xrange(0, len(test_bench_ids), pipeline_chunk_size): 358 id_chunk = test_bench_ids[i:i + pipeline_chunk_size] 359 result_future = yield RunExperimentalChunkPipeline( 360 bench_name, id_chunk) 361 results.append(result_future) 362 363 combined_results = yield pipeline_common.Extend(*results) 364 yield GenerateComparisonReportPipeline( 365 bench_name, description, combined_results) 366 367 368def SetupBaseDataForBench(): 369 """Adds tasks to queue to create base data for bench.""" 370 if TestBench.query().fetch(keys_only=True, limit=1): 371 raise Exception('Base data already exist.') 372 373 # This will take a while, so we do it in a task queue. 374 deferred.defer(_AddCreateTestBenchTasks, _queue=_TASK_QUEUE_NAME) 375 376 377def BenchFindChangePoints(bench_name, description): 378 """Submits a bench job for a bench_name and description. 379 380 Requires an implementation of find_change_points added to 381 _EXPERIMENTAL_FUNCTIONS. At least bench_name or description must 382 be different for each job. 383 384 Args: 385 bench_name: A string bench name which should exist in they keys of 386 _EXPERIMENTAL_FUNCTIONS. 387 description: A string description of this bench job. 388 389 Raises: 390 ValueError: The input was not valid. 391 Exception: Not enough data available. 392 """ 393 if bench_name not in _EXPERIMENTAL_FUNCTIONS: 394 raise ValueError('%s is not a valid find anomalies bench function.' % 395 bench_name) 396 397 bench_key = '%s.%s' % (bench_name, description) 398 submitted_benches = layered_cache.Get(_FIND_ANOMALIES_BENCH_CACHE_KEY) 399 if not submitted_benches: 400 submitted_benches = {} 401 if bench_key in submitted_benches: 402 raise ValueError('Bench job for "%s. %s" already in submitted.' % 403 (bench_name, description)) 404 405 submitted_benches[bench_key] = True 406 layered_cache.Set(_FIND_ANOMALIES_BENCH_CACHE_KEY, submitted_benches) 407 408 # Check if base bench data exist. 409 if not TestBench.query().fetch(keys_only=True, limit=1): 410 raise Exception('No base data available to bench against.') 411 412 # Add to taskqueue to run simulation. 413 stage = RunExperimentalPipeline(bench_name, description) 414 stage.start(queue_name=_TASK_QUEUE_NAME) 415 416 417def DeleteAllTestBenchEntities(): 418 """Deletes all TestBench data.""" 419 ndb.delete_multi(TestBench.query().fetch(keys_only=True)) 420 421 422def _AddCreateTestBenchTasks(): 423 """Adds _CreateTestBench tasks to queue.""" 424 sheriff_key = ndb.Key('Sheriff', _TEST_DATA_SHERIFF) 425 query = graph_data.Test.query( 426 graph_data.Test.sheriff == sheriff_key, 427 graph_data.Test.has_rows == True, 428 graph_data.Test.deprecated == False) 429 430 tests = query.fetch(limit=_NUM_TEST_TO_BENCH) 431 432 tests = [t for t in tests if _GetSheriffForTest(t) and not _IsRefBuild(t)] 433 for test in tests: 434 deferred.defer(_CreateTestBench, test.key, _queue=_TASK_QUEUE_NAME) 435 436 437def _CreateTestBench(test_key): 438 """Fetches and stores test and row data that would be used to run bench.""" 439 # Get rows entity. 440 query = graph_data.Row.query(projection=['revision', 'value']) 441 query = query.filter(graph_data.Row.parent_test == test_key) 442 query = query.order(-graph_data.Row.revision) 443 rows = list(reversed(query.fetch(limit=_NUM_ROWS_TO_BENCH))) 444 data_series = [(row.revision, row.value) for row in rows] 445 446 # Add TestBench entity. 447 test_bench = TestBench(test=test_key, data_series=data_series) 448 _UpdateInvalidAndConfirmedAnomalyRevs(test_bench) 449 _RunBaseAlertProcessing(test_bench) 450 test_bench.put() 451 452 453def _AddReportToLog(report_dict): 454 """Adds a log for bench results.""" 455 report = _REPORT_TEMPLATE % report_dict 456 formatter = quick_logger.Formatter() 457 logger = quick_logger.QuickLogger( 458 'bench_find_anomalies', 'report', formatter) 459 logger.Log(report) 460 logger.Save() 461 462 463def _Flatten(list_of_list): 464 """Creates set of all items in the sublists.""" 465 flattened = set() 466 for item in list_of_list: 467 flattened.update(item) 468 return flattened 469 470 471def _UpdateInvalidAndConfirmedAnomalyRevs(test_bench): 472 """Updates TestBench entity with invalid and confirmed anomalies revs.""" 473 474 # Start rev for getting Anomalies should be at min_segment_size. 475 test = test_bench.test.get() 476 config_dict = anomaly_config.GetAnomalyConfigDict(test) 477 min_segment_size = config_dict.get( 478 'min_segment_size', find_change_points.MIN_SEGMENT_SIZE) 479 start_index = min(min_segment_size, len(test_bench.data_series)) - 1 480 start_rev = test_bench.data_series[start_index][0] 481 482 query = anomaly.Anomaly.query(anomaly.Anomaly.test == test_bench.test) 483 anomalies = query.fetch() 484 anomalies.sort(key=lambda a: a.end_revision) 485 anomalies = [a for a in anomalies if a.end_revision >= start_rev and 486 not a.is_improvement] 487 488 test_bench.invalid_anomaly_revs = [ 489 _GetRevsAroundRev(test_bench.data_series, a.end_revision) 490 for a in anomalies if a.bug_id == -1] 491 test_bench.confirmed_anomaly_revs = [ 492 _GetRevsAroundRev(test_bench.data_series, a.end_revision) 493 for a in anomalies if a.bug_id > 0] 494 495 496def _RunBaseAlertProcessing(test_bench): 497 """Runs base alert processing simulation on TestBench entity. 498 499 This function runs the current find_change_points.FindChangePoints 500 implementation and saves the revisions around the found anomalies to 501 a TestBench entity. 502 503 Args: 504 test_bench: A TestBench entity. 505 """ 506 test = test_bench.test.get() 507 config_dict = anomaly_config.GetAnomalyConfigDict(test) 508 change_points = debug_alert.SimulateAlertProcessing( 509 test_bench.data_series, **config_dict) 510 511 test_bench.base_anomaly_revs = [ 512 _GetRevsAroundRev(test_bench.data_series, change_point.x_value) 513 for change_point in change_points if _IsRegression(change_point, test)] 514 515 516def _GetRevsAroundRev(data_series, revision): 517 """Gets a list of revisions from before to after a given revision. 518 519 Args: 520 data_series: A list of (revision, value). 521 revision: A revision number. 522 523 Returns: 524 A list of revisions. 525 """ 526 if not _MAX_SEGMENT_SIZE_AROUND_ANOMALY: 527 return [revision] 528 529 middle_index = 0 530 for i in xrange(len(data_series)): 531 if data_series[i][0] == revision: 532 middle_index = i 533 break 534 start_index = max(0, middle_index - _MAX_SEGMENT_SIZE_AROUND_ANOMALY) 535 end_index = middle_index + _MAX_SEGMENT_SIZE_AROUND_ANOMALY + 1 536 series_around_rev = data_series[start_index:end_index] 537 return [s[0] for s in series_around_rev] 538 539 540def _IsRefBuild(test): 541 """Returns True if test is a reference build.""" 542 key_path = test.key.string_id() 543 return key_path[-1] == 'ref' or key_path[-1].endswith('_ref') 544 545 546def _GetSheriffForTest(test): 547 """Gets the Sheriff for a test, or None if no sheriff.""" 548 if test.sheriff: 549 return test.sheriff.get() 550 return None 551 552 553def _IsRegression(change_point, test): 554 """Returns whether the alert is a regression for the given test. 555 556 Args: 557 change_point: A find_change_points.ChangePoint object. 558 test: Test to get the regression direction for. 559 560 Returns: 561 True if it is a regression anomaly, otherwise False. 562 """ 563 median_before = change_point.median_before 564 median_after = change_point.median_after 565 if (median_before < median_after and 566 test.improvement_direction == anomaly.UP): 567 return False 568 if (median_before >= median_after and 569 test.improvement_direction == anomaly.DOWN): 570 return False 571 return True 572