1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5# This file defines helper functions for putting entries into elasticsearch.
6
7"""Utils for sending metadata to elasticsearch
8
9Elasticsearch is a key-value store NOSQL database.
10Source is here: https://github.com/elasticsearch/elasticsearch
11We will be using es to store our metadata.
12
13For example, if we wanted to store the following metadata:
14
15metadata = {
16    'host_id': 1
17    'job_id': 20
18    'time_start': 100000
19    'time_recorded': 100006
20}
21
22The following call will send metadata to the default es server.
23    es_utils.ESMetadata().post(index, metadata)
24We can also specify which port and host to use.
25
26Using for testing: Sometimes, when we choose a single index
27to put entries into, we want to clear that index of all
28entries before running our tests. Use clear_index function.
29(see es_utils_functionaltest.py for an example)
30
31This file also contains methods for sending queries to es. Currently,
32the query (json dict) we send to es is quite complicated (but flexible).
33We've included several methods that composes queries that would be useful.
34These methods are all named create_*_query()
35
36For example, the below query returns job_id, host_id, and job_start
37for all job_ids in [0, 99999] and host_id matching 10.
38
39range_eq_query = {
40    'fields': ['job_id', 'host_id', 'job_start'],
41    'query': {
42        'filtered': {
43            'query': {
44                'match': {
45                    'host_id': 10,
46                }
47            }
48            'filter': {
49                'range': {
50                    'job_id': {
51                        'gte': 0,
52                        'lte': 99999,
53                    }
54                }
55            }
56        }
57    }
58}
59
60To send a query once it is created, call execute_query() to send it to the
61intended elasticsearch server.
62
63"""
64
65import collections
66import json
67import logging
68import socket
69import time
70
71try:
72    import elasticsearch
73    from elasticsearch import helpers as elasticsearch_helpers
74except ImportError:
75    logging.debug('Failed to import elasticsearch. Mock classes will be used '
76                  'and calls to Elasticsearch server will be no-op. Test run '
77                  'is not affected by the missing elasticsearch module.')
78    import elasticsearch_mock as elasticsearch
79    elasticsearch_helpers = elasticsearch.Elasticsearch()
80
81
82# Global timeout for connection to esdb timeout.
83DEFAULT_TIMEOUT = 30
84
85# Default result size for a query.
86DEFAULT_RESULT_SIZE = 10**4
87# Default result size when scrolling query results.
88DEFAULT_SCROLL_SIZE = 5*10**4
89
90class EsUtilException(Exception):
91    """Exception raised when functions here fail. """
92    pass
93
94
95QueryResult = collections.namedtuple('QueryResult', ['total', 'hits'])
96
97
98class ESMetadata(object):
99    """Class handling es connection for metadata."""
100
101    @property
102    def es(self):
103        """Read only property, lazily initialized"""
104        if not self._es:
105            self._es = elasticsearch.Elasticsearch(host=self.host,
106                                                   port=self.port,
107                                                   timeout=self.timeout)
108        return self._es
109
110
111    def __init__(self, use_http, host, port, index, udp_port,
112                 timeout=DEFAULT_TIMEOUT):
113        """Initialize ESMetadata object.
114
115        @param use_http: Whether to send data to ES using HTTP.
116        @param host: Elasticsearch host.
117        @param port: Elasticsearch port.
118        @param index: What index the metadata is stored in.
119        @param udp_port: What port to use for UDP data.
120        @param timeout: How long to wait while connecting to es.
121        """
122        self.use_http = use_http
123        self.host = host
124        self.port = port
125        self.index = index
126        self.udp_port = udp_port
127        self.timeout = timeout
128        self._es = None
129
130
131    def _send_data_http(self, type_str, metadata):
132        """Sends data to insert into elasticsearch using HTTP.
133
134        @param type_str: sets the _type field in elasticsearch db.
135        @param metadata: dictionary object containing metadata
136        """
137        try:
138            self.es.index(index=self.index, doc_type=type_str, body=metadata)
139        except elasticsearch.ElasticsearchException as e:
140            # Mute exceptions from metadata reporting to prevent meta data
141            # reporting errors from killing test.
142            logging.error(e)
143
144
145    def _send_data_udp(self, type_str, metadata):
146        """Sends data to insert into elasticsearch using UDP.
147
148        @param type_str: sets the _type field in elasticsearch db.
149        @param metadata: dictionary object containing metadata
150        """
151        try:
152            # Header.
153            message = json.dumps(
154                    {'index': {'_index': self.index, '_type': type_str}},
155                    separators=(', ', ' : '))
156            message += '\n'
157            # Metadata.
158            message += json.dumps(metadata, separators=(', ', ' : '))
159            message += '\n'
160
161            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
162            sock.sendto(message, (self.host, self.udp_port))
163        except socket.error as e:
164            logging.warn(e)
165
166
167    def post(self, type_str, metadata, log_time_recorded=True, **kwargs):
168        """Wraps call of send_data, inserts entry into elasticsearch.
169
170        @param type_str: Sets the _type field in elasticsearch db.
171        @param metadata: Dictionary object containing metadata
172        @param log_time_recorded: Whether to automatically record the time
173                                  this metadata is recorded. Default is True.
174        @param kwargs: Additional metadata fields
175
176        @return: True if post action succeeded. Otherwise return False.
177
178        """
179        if not metadata:
180            return True
181
182        metadata = metadata.copy()
183        metadata.update(kwargs)
184        # metadata should not contain anything with key '_type'
185        if '_type' in metadata:
186            type_str = metadata['_type']
187            del metadata['_type']
188        if log_time_recorded:
189            metadata['time_recorded'] = time.time()
190        try:
191            if self.use_http:
192                self._send_data_http(type_str, metadata)
193            else:
194                self._send_data_udp(type_str, metadata)
195            return True
196        except elasticsearch.ElasticsearchException as e:
197            logging.error(e)
198            return False
199
200
201    def bulk_post(self, data_list, log_time_recorded=True, **kwargs):
202        """Wraps call of send_data, inserts entry into elasticsearch.
203
204        @param data_list: A list of dictionary objects containing metadata.
205        @param log_time_recorded: Whether to automatically record the time
206                                  this metadata is recorded. Default is True.
207        @param kwargs: Additional metadata fields
208
209        @return: True if post action succeeded. Otherwise return False.
210
211        """
212        if not data_list:
213            return True
214
215        actions = []
216        for metadata in data_list:
217            metadata = metadata.copy()
218            metadata.update(kwargs)
219            if log_time_recorded and not 'time_recorded' in metadata:
220                metadata['time_recorded'] = time.time()
221            metadata['_index'] = self.index
222            actions.append(metadata)
223
224        try:
225            elasticsearch_helpers.bulk(self.es, actions)
226            return True
227        except elasticsearch.ElasticsearchException as e:
228            logging.error(e)
229            return False
230
231
232    def _compose_query(self, equality_constraints=[], fields_returned=None,
233                       range_constraints=[], size=DEFAULT_RESULT_SIZE,
234                       sort_specs=None, regex_constraints=[],
235                       batch_constraints=[]):
236        """Creates a dict. representing multple range and/or equality queries.
237
238        Example input:
239        _compose_query(
240            fields_returned = ['time_recorded', 'hostname',
241                               'status', 'dbg_str'],
242            equality_constraints = [
243                ('_type', 'host_history'),
244                ('hostname', '172.22.169.106'),
245            ],
246            range_constraints = [
247                ('time_recorded', 1405628341.904379, 1405700341.904379)
248            ],
249            size=20,
250            sort_specs=[
251                'hostname',
252                {'time_recorded': 'asc'},
253            ]
254        )
255
256        Output:
257        {
258            'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'],
259            'query': {
260                'bool': {
261                    'minimum_should_match': 3,
262                    'should': [
263                        {
264                            'term':  {
265                                '_type': 'host_history'
266                            }
267                        },
268                        {
269                            'term': {
270                                'hostname': '172.22.169.106'
271                            }
272                        },
273                        {
274                            'range': {
275                                'time_recorded': {
276                                    'gte': 1405628341.904379,
277                                    'lte': 1405700341.904379
278                                }
279                            }
280                        }
281                    ]
282                },
283            },
284            'size': 20
285            'sort': [
286                'hostname',
287                { 'time_recorded': 'asc'},
288            ]
289        }
290
291        @param equality_constraints: list of tuples of (field, value) pairs
292            representing what each field should equal to in the query.
293            e.g. [ ('field1', 1), ('field2', 'value') ]
294        @param fields_returned: list of fields that we should return when
295            the query is executed. Set it to None to return all fields. Note
296            that the key/vals will be stored in _source key of the hit object,
297            if fields_returned is set to None.
298        @param range_constraints: list of tuples of (field, low, high) pairs
299            representing what each field should be between (inclusive).
300            e.g. [ ('field1', 2, 10), ('field2', -1, 20) ]
301            If you want one side to be unbounded, you can use None.
302            e.g. [ ('field1', 2, None) ] means value of field1 >= 2.
303        @param size: max number of entries to return. Default is 100000.
304        @param sort_specs: A list of fields to sort on, tiebreakers will be
305            broken by the next field(s).
306        @param regex_constraints: A list of regex constraints of tuples of
307            (field, value) pairs, e.g., [('filed1', '.*value.*')].
308        @param batch_constraints: list of tuples of (field, list) pairs
309            representing each field should be equal to one of the values
310            in the list.
311            e.g., [ ('job_id', [10, 11, 12, 13]) ]
312        @returns: dictionary object that represents query to es.
313                  This will return None if there are no equality constraints
314                  and no range constraints.
315        """
316        if not equality_constraints and not range_constraints:
317            raise EsUtilException('No range or equality constraints specified.')
318
319        # Creates list of range dictionaries to put in the 'should' list.
320        range_list = []
321        if range_constraints:
322            for key, low, high in range_constraints:
323                if low is None and high is None:
324                    continue
325                temp_dict = {}
326                if low is not None:
327                    temp_dict['gte'] = low
328                if high is not None:
329                    temp_dict['lte'] = high
330                range_list.append( {'range': {key: temp_dict}})
331
332        # Creates the list of term dictionaries to put in the 'should' list.
333        eq_list = [{'term': {k: v}} for k, v in equality_constraints if k]
334        batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k]
335        regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k]
336        constraints = eq_list + batch_list + range_list + regex_list
337        query = {
338            'query': {
339                'bool': {
340                    'must': constraints,
341                }
342            },
343        }
344        if fields_returned:
345            query['fields'] = fields_returned
346        query['size'] = size
347        if sort_specs:
348            query['sort'] = sort_specs
349        return query
350
351
352    def execute_query(self, query):
353        """Makes a query on the given index.
354
355        @param query: query dictionary (see _compose_query)
356        @returns: A QueryResult instance describing the result.
357
358        Example output:
359        {
360            "took" : 5,
361            "timed_out" : false,
362            "_shards" : {
363                "total" : 16,
364                "successful" : 16,
365                "failed" : 0
366            },
367            "hits" : {
368                "total" : 4,
369                "max_score" : 1.0,
370                "hits" : [ {
371                    "_index" : "graphite_metrics2",
372                    "_type" : "metric",
373                    "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss",
374                    "_score" : 1.0,
375                    "_source":{"target_type": "timer",
376                               "host_id": 1,
377                               "job_id": 22,
378                               "time_start": 400}
379                }, {
380                    "_index" : "graphite_metrics2",
381                    "_type" : "metric",
382                    "_id" : "dfgfddddddddddddddddddddddhhh",
383                    "_score" : 1.0,
384                    "_source":{"target_type": "timer",
385                        "host_id": 2,
386                        "job_id": 23,
387                        "time_start": 405}
388                }, {
389                "_index" : "graphite_metrics2",
390                "_type" : "metric",
391                "_id" : "erwerwerwewtrewgfednvfngfngfrhfd",
392                "_score" : 1.0,
393                "_source":{"target_type": "timer",
394                           "host_id": 3,
395                           "job_id": 24,
396                           "time_start": 4098}
397                }, {
398                    "_index" : "graphite_metrics2",
399                    "_type" : "metric",
400                    "_id" : "dfherjgwetfrsupbretowegoegheorgsa",
401                    "_score" : 1.0,
402                    "_source":{"target_type": "timer",
403                               "host_id": 22,
404                               "job_id": 25,
405                               "time_start": 4200}
406                } ]
407            }
408        }
409
410        """
411        if not self.es.indices.exists(index=self.index):
412            logging.error('Index (%s) does not exist on %s:%s',
413                          self.index, self.host, self.port)
414            return None
415        result = self.es.search(index=self.index, body=query)
416        # Check if all matched records are returned. It could be the size is
417        # set too small. Special case for size set to 1, as that means that
418        # the query cares about the first matched entry.
419        # TODO: Use pagination in Elasticsearch. This needs major change on how
420        #       query results are iterated.
421        size = query.get('size', 1)
422        need_scroll = 'size' in query and size == DEFAULT_RESULT_SIZE
423        return_count = len(result['hits']['hits'])
424        total_match = result['hits']['total']
425        if total_match > return_count and need_scroll:
426            logging.warn('There are %d matched records, only %d entries are '
427                         'returned. Query size is set to %d. Will try to use '
428                         'scroll command to get all entries.', total_match,
429                         return_count, size)
430            # Try to get all results with scroll.
431            hits = self._get_results_by_scan(query, total_match)
432        else:
433            hits = result['hits']['hits']
434        # Extract the actual results from the query.
435        output = QueryResult(total_match, [])
436        for hit in hits:
437            converted = {}
438            if 'fields' in hit:
439                for key, value in hit['fields'].items():
440                    converted[key] = value[0] if len(value)==1 else value
441            else:
442                converted = hit['_source'].copy()
443            output.hits.append(converted)
444        return output
445
446
447    def _get_results_by_scan(self, query, total_match=None):
448        """Get all results by using scan.
449
450        @param query: query dictionary (see _compose_query)
451        @param total_match: The number of total matched results. Pass the value
452                in so the code doesn't need to run another query to get it.
453
454        @returns: A list of matched results.
455        """
456        if True or not total_match:
457            # Reduce the return size to make the query run faster.
458            query['size'] = 1
459            result = self.es.search(index=self.index, body=query)
460            total_match = result['hits']['total']
461        # Remove the sort from query so scroll method can run faster.
462        sort = None
463        if 'sort' in query:
464            sort = query['sort']
465            if len(sort) > 1:
466                raise EsUtilException('_get_results_by_scan does not support '
467                                      'sort with more than one key: %s', sort)
468            del query['sort']
469        del query['size']
470        scroll = elasticsearch_helpers.scan(self.es, query=query,
471                                            index=self.index,
472                                            size=DEFAULT_SCROLL_SIZE)
473        hits = []
474        next_mark = 0
475        for hit in scroll:
476          hits.append(hit)
477          downloaded_percent = 100 * float(len(hits))/total_match
478          if downloaded_percent > next_mark:
479              logging.debug('%2.0f%% downloaded (%d)', downloaded_percent,
480                            len(hits))
481              next_mark += 5
482        logging.debug('Number of hits found: %s', len(hits))
483
484        if sort:
485            logging.debug('Sort hits with rule: %s', sort)
486            sort_key = sort[0].keys()[0]
487            is_desc = sort[0].values()[0] == 'desc'
488            # If the query has `fields` specified, the dict of hit stores value
489            # in hit['fields'], otherwise, the keyvals are stored in
490            # hit['_source'].
491            key = lambda hit:(hit['_source'][sort_key] if '_source' in hit else
492                              hit['fields'][sort_key][0])
493            hits = sorted(hits, key=key, reverse=is_desc)
494
495        return hits
496
497
498    def query(self, *args, **kwargs):
499        """The arguments to this function are the same as _compose_query."""
500        query = self._compose_query(*args, **kwargs)
501        return self.execute_query(query)
502