1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5# This file defines helper functions for putting entries into elasticsearch. 6 7"""Utils for sending metadata to elasticsearch 8 9Elasticsearch is a key-value store NOSQL database. 10Source is here: https://github.com/elasticsearch/elasticsearch 11We will be using es to store our metadata. 12 13For example, if we wanted to store the following metadata: 14 15metadata = { 16 'host_id': 1 17 'job_id': 20 18 'time_start': 100000 19 'time_recorded': 100006 20} 21 22The following call will send metadata to the default es server. 23 es_utils.ESMetadata().post(index, metadata) 24We can also specify which port and host to use. 25 26Using for testing: Sometimes, when we choose a single index 27to put entries into, we want to clear that index of all 28entries before running our tests. Use clear_index function. 29(see es_utils_functionaltest.py for an example) 30 31This file also contains methods for sending queries to es. Currently, 32the query (json dict) we send to es is quite complicated (but flexible). 33We've included several methods that composes queries that would be useful. 34These methods are all named create_*_query() 35 36For example, the below query returns job_id, host_id, and job_start 37for all job_ids in [0, 99999] and host_id matching 10. 38 39range_eq_query = { 40 'fields': ['job_id', 'host_id', 'job_start'], 41 'query': { 42 'filtered': { 43 'query': { 44 'match': { 45 'host_id': 10, 46 } 47 } 48 'filter': { 49 'range': { 50 'job_id': { 51 'gte': 0, 52 'lte': 99999, 53 } 54 } 55 } 56 } 57 } 58} 59 60To send a query once it is created, call execute_query() to send it to the 61intended elasticsearch server. 62 63""" 64 65import collections 66import json 67import logging 68import socket 69import time 70 71try: 72 import elasticsearch 73 from elasticsearch import helpers as elasticsearch_helpers 74except ImportError: 75 logging.debug('Failed to import elasticsearch. Mock classes will be used ' 76 'and calls to Elasticsearch server will be no-op. Test run ' 77 'is not affected by the missing elasticsearch module.') 78 import elasticsearch_mock as elasticsearch 79 elasticsearch_helpers = elasticsearch.Elasticsearch() 80 81 82# Global timeout for connection to esdb timeout. 83DEFAULT_TIMEOUT = 30 84 85# Default result size for a query. 86DEFAULT_RESULT_SIZE = 10**4 87# Default result size when scrolling query results. 88DEFAULT_SCROLL_SIZE = 5*10**4 89 90class EsUtilException(Exception): 91 """Exception raised when functions here fail. """ 92 pass 93 94 95QueryResult = collections.namedtuple('QueryResult', ['total', 'hits']) 96 97 98class ESMetadata(object): 99 """Class handling es connection for metadata.""" 100 101 @property 102 def es(self): 103 """Read only property, lazily initialized""" 104 if not self._es: 105 self._es = elasticsearch.Elasticsearch(host=self.host, 106 port=self.port, 107 timeout=self.timeout) 108 return self._es 109 110 111 def __init__(self, use_http, host, port, index, udp_port, 112 timeout=DEFAULT_TIMEOUT): 113 """Initialize ESMetadata object. 114 115 @param use_http: Whether to send data to ES using HTTP. 116 @param host: Elasticsearch host. 117 @param port: Elasticsearch port. 118 @param index: What index the metadata is stored in. 119 @param udp_port: What port to use for UDP data. 120 @param timeout: How long to wait while connecting to es. 121 """ 122 self.use_http = use_http 123 self.host = host 124 self.port = port 125 self.index = index 126 self.udp_port = udp_port 127 self.timeout = timeout 128 self._es = None 129 130 131 def _send_data_http(self, type_str, metadata): 132 """Sends data to insert into elasticsearch using HTTP. 133 134 @param type_str: sets the _type field in elasticsearch db. 135 @param metadata: dictionary object containing metadata 136 """ 137 try: 138 self.es.index(index=self.index, doc_type=type_str, body=metadata) 139 except elasticsearch.ElasticsearchException as e: 140 # Mute exceptions from metadata reporting to prevent meta data 141 # reporting errors from killing test. 142 logging.error(e) 143 144 145 def _send_data_udp(self, type_str, metadata): 146 """Sends data to insert into elasticsearch using UDP. 147 148 @param type_str: sets the _type field in elasticsearch db. 149 @param metadata: dictionary object containing metadata 150 """ 151 try: 152 # Header. 153 message = json.dumps( 154 {'index': {'_index': self.index, '_type': type_str}}, 155 separators=(', ', ' : ')) 156 message += '\n' 157 # Metadata. 158 message += json.dumps(metadata, separators=(', ', ' : ')) 159 message += '\n' 160 161 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 162 sock.sendto(message, (self.host, self.udp_port)) 163 except socket.error as e: 164 logging.warn(e) 165 166 167 def post(self, type_str, metadata, log_time_recorded=True, **kwargs): 168 """Wraps call of send_data, inserts entry into elasticsearch. 169 170 @param type_str: Sets the _type field in elasticsearch db. 171 @param metadata: Dictionary object containing metadata 172 @param log_time_recorded: Whether to automatically record the time 173 this metadata is recorded. Default is True. 174 @param kwargs: Additional metadata fields 175 176 @return: True if post action succeeded. Otherwise return False. 177 178 """ 179 if not metadata: 180 return True 181 182 metadata = metadata.copy() 183 metadata.update(kwargs) 184 # metadata should not contain anything with key '_type' 185 if '_type' in metadata: 186 type_str = metadata['_type'] 187 del metadata['_type'] 188 if log_time_recorded: 189 metadata['time_recorded'] = time.time() 190 try: 191 if self.use_http: 192 self._send_data_http(type_str, metadata) 193 else: 194 self._send_data_udp(type_str, metadata) 195 return True 196 except elasticsearch.ElasticsearchException as e: 197 logging.error(e) 198 return False 199 200 201 def bulk_post(self, data_list, log_time_recorded=True, **kwargs): 202 """Wraps call of send_data, inserts entry into elasticsearch. 203 204 @param data_list: A list of dictionary objects containing metadata. 205 @param log_time_recorded: Whether to automatically record the time 206 this metadata is recorded. Default is True. 207 @param kwargs: Additional metadata fields 208 209 @return: True if post action succeeded. Otherwise return False. 210 211 """ 212 if not data_list: 213 return True 214 215 actions = [] 216 for metadata in data_list: 217 metadata = metadata.copy() 218 metadata.update(kwargs) 219 if log_time_recorded and not 'time_recorded' in metadata: 220 metadata['time_recorded'] = time.time() 221 metadata['_index'] = self.index 222 actions.append(metadata) 223 224 try: 225 elasticsearch_helpers.bulk(self.es, actions) 226 return True 227 except elasticsearch.ElasticsearchException as e: 228 logging.error(e) 229 return False 230 231 232 def _compose_query(self, equality_constraints=[], fields_returned=None, 233 range_constraints=[], size=DEFAULT_RESULT_SIZE, 234 sort_specs=None, regex_constraints=[], 235 batch_constraints=[]): 236 """Creates a dict. representing multple range and/or equality queries. 237 238 Example input: 239 _compose_query( 240 fields_returned = ['time_recorded', 'hostname', 241 'status', 'dbg_str'], 242 equality_constraints = [ 243 ('_type', 'host_history'), 244 ('hostname', '172.22.169.106'), 245 ], 246 range_constraints = [ 247 ('time_recorded', 1405628341.904379, 1405700341.904379) 248 ], 249 size=20, 250 sort_specs=[ 251 'hostname', 252 {'time_recorded': 'asc'}, 253 ] 254 ) 255 256 Output: 257 { 258 'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'], 259 'query': { 260 'bool': { 261 'minimum_should_match': 3, 262 'should': [ 263 { 264 'term': { 265 '_type': 'host_history' 266 } 267 }, 268 { 269 'term': { 270 'hostname': '172.22.169.106' 271 } 272 }, 273 { 274 'range': { 275 'time_recorded': { 276 'gte': 1405628341.904379, 277 'lte': 1405700341.904379 278 } 279 } 280 } 281 ] 282 }, 283 }, 284 'size': 20 285 'sort': [ 286 'hostname', 287 { 'time_recorded': 'asc'}, 288 ] 289 } 290 291 @param equality_constraints: list of tuples of (field, value) pairs 292 representing what each field should equal to in the query. 293 e.g. [ ('field1', 1), ('field2', 'value') ] 294 @param fields_returned: list of fields that we should return when 295 the query is executed. Set it to None to return all fields. Note 296 that the key/vals will be stored in _source key of the hit object, 297 if fields_returned is set to None. 298 @param range_constraints: list of tuples of (field, low, high) pairs 299 representing what each field should be between (inclusive). 300 e.g. [ ('field1', 2, 10), ('field2', -1, 20) ] 301 If you want one side to be unbounded, you can use None. 302 e.g. [ ('field1', 2, None) ] means value of field1 >= 2. 303 @param size: max number of entries to return. Default is 100000. 304 @param sort_specs: A list of fields to sort on, tiebreakers will be 305 broken by the next field(s). 306 @param regex_constraints: A list of regex constraints of tuples of 307 (field, value) pairs, e.g., [('filed1', '.*value.*')]. 308 @param batch_constraints: list of tuples of (field, list) pairs 309 representing each field should be equal to one of the values 310 in the list. 311 e.g., [ ('job_id', [10, 11, 12, 13]) ] 312 @returns: dictionary object that represents query to es. 313 This will return None if there are no equality constraints 314 and no range constraints. 315 """ 316 if not equality_constraints and not range_constraints: 317 raise EsUtilException('No range or equality constraints specified.') 318 319 # Creates list of range dictionaries to put in the 'should' list. 320 range_list = [] 321 if range_constraints: 322 for key, low, high in range_constraints: 323 if low is None and high is None: 324 continue 325 temp_dict = {} 326 if low is not None: 327 temp_dict['gte'] = low 328 if high is not None: 329 temp_dict['lte'] = high 330 range_list.append( {'range': {key: temp_dict}}) 331 332 # Creates the list of term dictionaries to put in the 'should' list. 333 eq_list = [{'term': {k: v}} for k, v in equality_constraints if k] 334 batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k] 335 regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k] 336 constraints = eq_list + batch_list + range_list + regex_list 337 query = { 338 'query': { 339 'bool': { 340 'must': constraints, 341 } 342 }, 343 } 344 if fields_returned: 345 query['fields'] = fields_returned 346 query['size'] = size 347 if sort_specs: 348 query['sort'] = sort_specs 349 return query 350 351 352 def execute_query(self, query): 353 """Makes a query on the given index. 354 355 @param query: query dictionary (see _compose_query) 356 @returns: A QueryResult instance describing the result. 357 358 Example output: 359 { 360 "took" : 5, 361 "timed_out" : false, 362 "_shards" : { 363 "total" : 16, 364 "successful" : 16, 365 "failed" : 0 366 }, 367 "hits" : { 368 "total" : 4, 369 "max_score" : 1.0, 370 "hits" : [ { 371 "_index" : "graphite_metrics2", 372 "_type" : "metric", 373 "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss", 374 "_score" : 1.0, 375 "_source":{"target_type": "timer", 376 "host_id": 1, 377 "job_id": 22, 378 "time_start": 400} 379 }, { 380 "_index" : "graphite_metrics2", 381 "_type" : "metric", 382 "_id" : "dfgfddddddddddddddddddddddhhh", 383 "_score" : 1.0, 384 "_source":{"target_type": "timer", 385 "host_id": 2, 386 "job_id": 23, 387 "time_start": 405} 388 }, { 389 "_index" : "graphite_metrics2", 390 "_type" : "metric", 391 "_id" : "erwerwerwewtrewgfednvfngfngfrhfd", 392 "_score" : 1.0, 393 "_source":{"target_type": "timer", 394 "host_id": 3, 395 "job_id": 24, 396 "time_start": 4098} 397 }, { 398 "_index" : "graphite_metrics2", 399 "_type" : "metric", 400 "_id" : "dfherjgwetfrsupbretowegoegheorgsa", 401 "_score" : 1.0, 402 "_source":{"target_type": "timer", 403 "host_id": 22, 404 "job_id": 25, 405 "time_start": 4200} 406 } ] 407 } 408 } 409 410 """ 411 if not self.es.indices.exists(index=self.index): 412 logging.error('Index (%s) does not exist on %s:%s', 413 self.index, self.host, self.port) 414 return None 415 result = self.es.search(index=self.index, body=query) 416 # Check if all matched records are returned. It could be the size is 417 # set too small. Special case for size set to 1, as that means that 418 # the query cares about the first matched entry. 419 # TODO: Use pagination in Elasticsearch. This needs major change on how 420 # query results are iterated. 421 size = query.get('size', 1) 422 need_scroll = 'size' in query and size == DEFAULT_RESULT_SIZE 423 return_count = len(result['hits']['hits']) 424 total_match = result['hits']['total'] 425 if total_match > return_count and need_scroll: 426 logging.warn('There are %d matched records, only %d entries are ' 427 'returned. Query size is set to %d. Will try to use ' 428 'scroll command to get all entries.', total_match, 429 return_count, size) 430 # Try to get all results with scroll. 431 hits = self._get_results_by_scan(query, total_match) 432 else: 433 hits = result['hits']['hits'] 434 # Extract the actual results from the query. 435 output = QueryResult(total_match, []) 436 for hit in hits: 437 converted = {} 438 if 'fields' in hit: 439 for key, value in hit['fields'].items(): 440 converted[key] = value[0] if len(value)==1 else value 441 else: 442 converted = hit['_source'].copy() 443 output.hits.append(converted) 444 return output 445 446 447 def _get_results_by_scan(self, query, total_match=None): 448 """Get all results by using scan. 449 450 @param query: query dictionary (see _compose_query) 451 @param total_match: The number of total matched results. Pass the value 452 in so the code doesn't need to run another query to get it. 453 454 @returns: A list of matched results. 455 """ 456 if True or not total_match: 457 # Reduce the return size to make the query run faster. 458 query['size'] = 1 459 result = self.es.search(index=self.index, body=query) 460 total_match = result['hits']['total'] 461 # Remove the sort from query so scroll method can run faster. 462 sort = None 463 if 'sort' in query: 464 sort = query['sort'] 465 if len(sort) > 1: 466 raise EsUtilException('_get_results_by_scan does not support ' 467 'sort with more than one key: %s', sort) 468 del query['sort'] 469 del query['size'] 470 scroll = elasticsearch_helpers.scan(self.es, query=query, 471 index=self.index, 472 size=DEFAULT_SCROLL_SIZE) 473 hits = [] 474 next_mark = 0 475 for hit in scroll: 476 hits.append(hit) 477 downloaded_percent = 100 * float(len(hits))/total_match 478 if downloaded_percent > next_mark: 479 logging.debug('%2.0f%% downloaded (%d)', downloaded_percent, 480 len(hits)) 481 next_mark += 5 482 logging.debug('Number of hits found: %s', len(hits)) 483 484 if sort: 485 logging.debug('Sort hits with rule: %s', sort) 486 sort_key = sort[0].keys()[0] 487 is_desc = sort[0].values()[0] == 'desc' 488 # If the query has `fields` specified, the dict of hit stores value 489 # in hit['fields'], otherwise, the keyvals are stored in 490 # hit['_source']. 491 key = lambda hit:(hit['_source'][sort_key] if '_source' in hit else 492 hit['fields'][sort_key][0]) 493 hits = sorted(hits, key=key, reverse=is_desc) 494 495 return hits 496 497 498 def query(self, *args, **kwargs): 499 """The arguments to this function are the same as _compose_query.""" 500 query = self._compose_query(*args, **kwargs) 501 return self.execute_query(query) 502