1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5# This file defines helper functions for putting entries into elasticsearch. 6 7"""Utils for sending metadata to elasticsearch 8 9Elasticsearch is a key-value store NOSQL database. 10Source is here: https://github.com/elasticsearch/elasticsearch 11We will be using es to store our metadata. 12 13For example, if we wanted to store the following metadata: 14 15metadata = { 16 'host_id': 1 17 'job_id': 20 18 'time_start': 100000 19 'time_recorded': 100006 20} 21 22The following call will send metadata to the default es server. 23 autotest_es.post(index, metadata) 24We can also specify which port and host to use. 25 26Using for testing: Sometimes, when we choose a single index 27to put entries into, we want to clear that index of all 28entries before running our tests. Use clear_index function. 29(see es_utils_functionaltest.py for an example) 30 31This file also contains methods for sending queries to es. Currently, 32the query (json dict) we send to es is quite complicated (but flexible). 33 34For example, the below query returns job_id, host_id, and job_start 35for all job_ids in [0, 99999] and host_id matching 10. 36 37range_eq_query = { 38 'fields': ['job_id', 'host_id', 'job_start'], 39 'query': { 40 'filtered': { 41 'query': { 42 'match': { 43 'host_id': 10, 44 } 45 } 46 'filter': { 47 'range': { 48 'job_id': { 49 'gte': 0, 50 'lte': 99999, 51 } 52 } 53 } 54 } 55 } 56} 57 58To send a query once it is created, call execute_query() to send it to the 59intended elasticsearch server. The query() function can be used to construct a 60query with certain parameters and execute it all in one call. 61 62""" 63 64import es_utils 65 66import common 67from autotest_lib.client.common_lib import global_config 68 69 70# Server and ports for elasticsearch (for metadata use only) 71METADATA_ES_SERVER = global_config.global_config.get_config_value( 72 'CROS', 'ES_HOST', default='localhost') 73ES_PORT = global_config.global_config.get_config_value( 74 'CROS', 'ES_PORT', type=int, default=9200) 75ES_UDP_PORT = global_config.global_config.get_config_value( 76 'CROS', 'ES_UDP_PORT', type=int, default=9700) 77# Whether to use http. udp is very little overhead (around 3 ms) compared to 78# using http (tcp) takes ~ 500 ms for the first connection and 50-100ms for 79# subsequent connections. 80ES_USE_HTTP = global_config.global_config.get_config_value( 81 'CROS', 'ES_USE_HTTP', type=bool, default=False) 82 83# If CLIENT/metadata_index is not set, INDEX_METADATA falls back to 84# autotest instance name (SERVER/hostname). 85INDEX_METADATA = global_config.global_config.get_config_value( 86 'CLIENT', 'metadata_index', type=str, default=None) 87if not INDEX_METADATA: 88 INDEX_METADATA = global_config.global_config.get_config_value( 89 'SERVER', 'hostname', type=str, default='localhost') 90 91DEFAULT_BULK_POST_RETRIES = 5 92 93def post(use_http=ES_USE_HTTP, host=METADATA_ES_SERVER, port=ES_PORT, 94 timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA, 95 udp_port=ES_UDP_PORT, 96 *args, **kwargs): 97 """This function takes a series of arguments which are passed to the 98 es_utils.ESMetadata constructor, and any other arguments are passed to 99 its post() function. For an explanation of each, see those functions in 100 es_utils. 101 """ 102 esmd = es_utils.ESMetadata(use_http=use_http, host=host, port=port, 103 timeout=timeout, index=index, udp_port=udp_port) 104 return esmd.post(*args, **kwargs) 105 106 107def bulk_post(data_list, host=METADATA_ES_SERVER, port=ES_PORT, 108 timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA, 109 retries=DEFAULT_BULK_POST_RETRIES, *args, **kwargs): 110 """This function takes a series of arguments which are passed to the 111 es_utils.ESMetadata constructor, and a list of metadata, then upload to 112 Elasticsearch server using Elasticsearch bulk API. This can greatly nhance 113 the performance of uploading data using HTTP. 114 For an explanation of each argument, see those functions in es_utils. 115 """ 116 esmd = es_utils.ESMetadata(use_http=True, host=host, port=port, 117 timeout=timeout, index=index, 118 udp_port=ES_UDP_PORT) 119 # bulk post may fail due to the amount of data, retry several times. 120 for _ in range(retries): 121 if esmd.bulk_post(data_list, *args, **kwargs): 122 return True 123 return False 124 125 126def execute_query(host=METADATA_ES_SERVER, port=ES_PORT, 127 timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA, 128 *args, **kwargs): 129 """This function takes a series of arguments which are passed to the 130 es_utils.ESMetadata constructor, and any other arguments are passed to 131 its execute_query() function. For an explanation of each, see those 132 functions in es_utils. 133 """ 134 esmd = es_utils.ESMetadata(use_http=True, host=host, port=port, 135 timeout=timeout, index=index, udp_port=0) 136 return esmd.execute_query(*args, **kwargs) 137 138 139def query(host=METADATA_ES_SERVER, port=ES_PORT, 140 timeout=es_utils.DEFAULT_TIMEOUT, 141 index=INDEX_METADATA, *args, **kwargs): 142 """This function takes a series of arguments which are passed to the 143 es_utils.ESMetadata constructor, and any other arguments are passed to 144 its query() function. For an explanation of each, see those functions in 145 es_utils. 146 """ 147 esmd = es_utils.ESMetadata(use_http=True, host=host, port=port, 148 timeout=timeout, index=index, udp_port=0) 149 return esmd.query(*args, **kwargs) 150