1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5# This file defines helper functions for putting entries into elasticsearch.
6
7"""Utils for sending metadata to elasticsearch
8
9Elasticsearch is a key-value store NOSQL database.
10Source is here: https://github.com/elasticsearch/elasticsearch
11We will be using es to store our metadata.
12
13For example, if we wanted to store the following metadata:
14
15metadata = {
16    'host_id': 1
17    'job_id': 20
18    'time_start': 100000
19    'time_recorded': 100006
20}
21
22The following call will send metadata to the default es server.
23    autotest_es.post(index, metadata)
24We can also specify which port and host to use.
25
26Using for testing: Sometimes, when we choose a single index
27to put entries into, we want to clear that index of all
28entries before running our tests. Use clear_index function.
29(see es_utils_functionaltest.py for an example)
30
31This file also contains methods for sending queries to es. Currently,
32the query (json dict) we send to es is quite complicated (but flexible).
33
34For example, the below query returns job_id, host_id, and job_start
35for all job_ids in [0, 99999] and host_id matching 10.
36
37range_eq_query = {
38    'fields': ['job_id', 'host_id', 'job_start'],
39    'query': {
40        'filtered': {
41            'query': {
42                'match': {
43                    'host_id': 10,
44                }
45            }
46            'filter': {
47                'range': {
48                    'job_id': {
49                        'gte': 0,
50                        'lte': 99999,
51                    }
52                }
53            }
54        }
55    }
56}
57
58To send a query once it is created, call execute_query() to send it to the
59intended elasticsearch server. The query() function can be used to construct a
60query with certain parameters and execute it all in one call.
61
62"""
63
64import es_utils
65
66import common
67from autotest_lib.client.common_lib import global_config
68
69
70# Server and ports for elasticsearch (for metadata use only)
71METADATA_ES_SERVER = global_config.global_config.get_config_value(
72        'CROS', 'ES_HOST', default='localhost')
73ES_PORT = global_config.global_config.get_config_value(
74        'CROS', 'ES_PORT', type=int, default=9200)
75ES_UDP_PORT = global_config.global_config.get_config_value(
76        'CROS', 'ES_UDP_PORT', type=int, default=9700)
77# Whether to use http. udp is very little overhead (around 3 ms) compared to
78# using http (tcp) takes ~ 500 ms for the first connection and 50-100ms for
79# subsequent connections.
80ES_USE_HTTP = global_config.global_config.get_config_value(
81        'CROS', 'ES_USE_HTTP', type=bool, default=False)
82
83# If CLIENT/metadata_index is not set, INDEX_METADATA falls back to
84# autotest instance name (SERVER/hostname).
85INDEX_METADATA = global_config.global_config.get_config_value(
86        'CLIENT', 'metadata_index', type=str, default=None)
87if not INDEX_METADATA:
88    INDEX_METADATA = global_config.global_config.get_config_value(
89            'SERVER', 'hostname', type=str, default='localhost')
90
91DEFAULT_BULK_POST_RETRIES = 5
92
93def post(use_http=ES_USE_HTTP, host=METADATA_ES_SERVER, port=ES_PORT,
94         timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA,
95         udp_port=ES_UDP_PORT,
96         *args, **kwargs):
97    """This function takes a series of arguments which are passed to the
98    es_utils.ESMetadata constructor, and any other arguments are passed to
99    its post() function. For an explanation of each, see those functions in
100    es_utils.
101    """
102    esmd = es_utils.ESMetadata(use_http=use_http, host=host, port=port,
103                               timeout=timeout, index=index, udp_port=udp_port)
104    return esmd.post(*args, **kwargs)
105
106
107def bulk_post(data_list, host=METADATA_ES_SERVER, port=ES_PORT,
108              timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA,
109              retries=DEFAULT_BULK_POST_RETRIES, *args, **kwargs):
110    """This function takes a series of arguments which are passed to the
111    es_utils.ESMetadata constructor, and a list of metadata, then upload to
112    Elasticsearch server using Elasticsearch bulk API. This can greatly nhance
113    the performance of uploading data using HTTP.
114    For an explanation of each argument, see those functions in es_utils.
115    """
116    esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
117                               timeout=timeout, index=index,
118                               udp_port=ES_UDP_PORT)
119    # bulk post may fail due to the amount of data, retry several times.
120    for _ in range(retries):
121        if esmd.bulk_post(data_list, *args, **kwargs):
122            return True
123    return False
124
125
126def execute_query(host=METADATA_ES_SERVER, port=ES_PORT,
127                  timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA,
128                  *args, **kwargs):
129    """This function takes a series of arguments which are passed to the
130    es_utils.ESMetadata constructor, and any other arguments are passed to
131    its execute_query() function. For an explanation of each, see those
132    functions in es_utils.
133    """
134    esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
135                               timeout=timeout, index=index, udp_port=0)
136    return esmd.execute_query(*args, **kwargs)
137
138
139def query(host=METADATA_ES_SERVER, port=ES_PORT,
140          timeout=es_utils.DEFAULT_TIMEOUT,
141          index=INDEX_METADATA, *args, **kwargs):
142    """This function takes a series of arguments which are passed to the
143    es_utils.ESMetadata constructor, and any other arguments are passed to
144    its query() function. For an explanation of each, see those functions in
145    es_utils.
146    """
147    esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
148                               timeout=timeout, index=index, udp_port=0)
149    return esmd.query(*args, **kwargs)
150