# from copy import deepcopy # from datetime import datetime, timedelta from django.conf import settings from elasticsearch import AsyncElasticsearch, Elasticsearch from elasticsearch.exceptions import NotFoundError, RequestError from core.db import StorageBackend, add_defaults # from json import dumps # pp = lambda x: print(dumps(x, indent=2)) from core.db.processing import parse_results from core.lib.parsing import ( QueryError, parse_date_time, parse_index, parse_rule, parse_sentiment, parse_size, parse_sort, parse_source, ) # These are sometimes numeric, sometimes strings. # If they are seen to be numeric first, ES will erroneously # index them as "long" and then subsequently fail to index messages # with strings in the field. keyword_fields = ["nick_id", "user_id", "net_id"] mapping = { "mappings": { "properties": { "ts": {"type": "date", "format": "epoch_second"}, "match_ts": {"type": "date", "format": "iso8601"}, "file_tim": {"type": "date", "format": "epoch_millis"}, "rule_uuid": {"type": "keyword"}, } } } for field in keyword_fields: mapping["mappings"]["properties"][field] = {"type": "text"} class ElasticsearchBackend(StorageBackend): def __init__(self): super().__init__("Elasticsearch") self.client = None self.async_client = None def initialise(self, **kwargs): """ Inititialise the Elasticsearch API endpoint. """ auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = Elasticsearch( settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False ) self.client = client async def async_initialise(self, **kwargs): """ Inititialise the Elasticsearch API endpoint in async mode. """ global mapping auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = AsyncElasticsearch( settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False ) self.async_client = client # Create the rule storage indices if await client.indices.exists(index=settings.INDEX_RULE_STORAGE): await client.indices.put_mapping( index=settings.INDEX_RULE_STORAGE, properties=mapping["mappings"]["properties"], ) else: await client.indices.create( index=settings.INDEX_RULE_STORAGE, mappings=mapping["mappings"] ) def construct_context_query( self, index, net, channel, src, num, size, type=None, nicks=None ): # Get the initial query query = self.construct_query(None, size, blank=True) extra_must = [] extra_should = [] extra_should2 = [] if num: extra_must.append({"match_phrase": {"num": num}}) if net: extra_must.append({"match_phrase": {"net": net}}) if channel: extra_must.append({"match": {"channel": channel}}) if nicks: for nick in nicks: extra_should2.append({"match": {"nick": nick}}) types = ["msg", "notice", "action", "kick", "topic", "mode"] fields = [ "nick", "ident", "host", "channel", "ts", "msg", "type", "net", "src", "tokens", ] query["fields"] = fields if index == "internal": fields.append("mtype") if channel == "*status" or type == "znc": if {"match": {"channel": channel}} in extra_must: extra_must.remove({"match": {"channel": channel}}) extra_should2 = [] # Type is one of msg or notice # extra_should.append({"match": {"mtype": "msg"}}) # extra_should.append({"match": {"mtype": "notice"}}) extra_should.append({"match": {"type": "znc"}}) extra_should.append({"match": {"type": "self"}}) extra_should2.append({"match": {"type": "znc"}}) extra_should2.append({"match": {"nick": channel}}) elif type == "auth": if {"match": {"channel": channel}} in extra_must: extra_must.remove({"match": {"channel": channel}}) extra_should2 = [] extra_should2.append({"match": {"nick": channel}}) # extra_should2.append({"match": {"mtype": "msg"}}) # extra_should2.append({"match": {"mtype": "notice"}}) extra_should.append({"match": {"type": "query"}}) extra_should2.append({"match": {"type": "self"}}) extra_should.append({"match": {"nick": channel}}) else: for ctype in types: extra_should.append({"match": {"mtype": ctype}}) else: for ctype in types: extra_should.append({"match": {"type": ctype}}) # query = { # "index": index, # "limit": size, # "query": { # "bool": { # "must": [ # # {"equals": {"src": src}}, # # { # # "bool": { # # "should": [*extra_should], # # } # # }, # # { # # "bool": { # # "should": [*extra_should2], # # } # # }, # *extra_must, # ] # } # }, # "fields": fields, # # "_source": False, # } if extra_must: for x in extra_must: query["query"]["bool"]["must"].append(x) if extra_should: query["query"]["bool"]["must"].append({"bool": {"should": [*extra_should]}}) if extra_should2: query["query"]["bool"]["must"].append( {"bool": {"should": [*extra_should2]}} ) return query def construct_query(self, query, size=None, blank=False, **kwargs): """ Accept some query parameters and construct an Elasticsearch query. """ query_base = { # "size": size, "query": {"bool": {"must": []}}, } if size: query_base["size"] = size query_string = { "query_string": { "query": query, # "fields": fields, # "default_field": "msg", # "type": "best_fields", "fuzziness": "AUTO", "fuzzy_transpositions": True, "fuzzy_max_expansions": 50, "fuzzy_prefix_length": 0, # "minimum_should_match": 1, "default_operator": "and", "analyzer": "standard", "lenient": True, "boost": 1, "allow_leading_wildcard": True, # "enable_position_increments": False, "phrase_slop": 3, # "max_determinized_states": 10000, "quote_field_suffix": "", "quote_analyzer": "standard", "analyze_wildcard": False, "auto_generate_synonyms_phrase_query": True, } } if not blank: query_base["query"]["bool"]["must"].append(query_string) return query_base def parse(self, response, **kwargs): parsed = parse_results(response, **kwargs) return parsed def run_query(self, user, search_query, **kwargs): """ Low level helper to run an ES query. Accept a user to pass it to the filter, so we can avoid filtering for superusers. Accept fields and size, for the fields we want to match and the number of results to return. """ if self.client is None: self.initialise() index = kwargs.get("index") try: response = self.client.search(body=search_query, index=index) except RequestError as err: print("Elasticsearch error", err) return err except NotFoundError as err: print("Elasticsearch error", err) return err return response async def async_run_query(self, user, search_query, **kwargs): """ Low level helper to run an ES query. Accept a user to pass it to the filter, so we can avoid filtering for superusers. Accept fields and size, for the fields we want to match and the number of results to return. """ if self.async_client is None: await self.async_initialise() index = kwargs.get("index") try: response = await self.async_client.search(body=search_query, index=index) except RequestError as err: print("Elasticsearch error", err) return err except NotFoundError as err: print("Elasticsearch error", err) return err return response async def async_store_matches(self, matches): """ Store a list of matches in Elasticsearch. :param index: The index to store the matches in. :param matches: A list of matches to store. """ if self.async_client is None: await self.async_initialise() for match in matches: result = await self.async_client.index( index=settings.INDEX_RULE_STORAGE, body=match ) if not result["result"] == "created": self.log.error(f"Indexing failed: {result}") self.log.debug(f"Indexed {len(matches)} messages in ES") def store_matches(self, matches): """ Store a list of matches in Elasticsearch. :param index: The index to store the matches in. :param matches: A list of matches to store. """ if self.client is None: self.initialise() for match in matches: result = self.client.index(index=settings.INDEX_RULE_STORAGE, body=match) if not result["result"] == "created": self.log.error(f"Indexing failed: {result}") self.log.debug(f"Indexed {len(matches)} messages in ES") def prepare_schedule_query(self, rule_object): """ Helper to run a scheduled query with reduced functionality. """ data = rule_object.parsed if "tags" in data: tags = data["tags"] else: tags = [] if "query" in data: query = data["query"][0] data["query"] = query add_bool = [] add_top = [] if "source" in data: total_count = len(data["source"]) total_sources = len(settings.MAIN_SOURCES) + len( settings.SOURCES_RESTRICTED ) if total_count != total_sources: add_top_tmp = {"bool": {"should": []}} for source_iter in data["source"]: add_top_tmp["bool"]["should"].append( {"match_phrase": {"src": source_iter}} ) add_top.append(add_top_tmp) for field, values in data.items(): if field not in ["source", "index", "tags", "query", "sentiment"]: for value in values: add_top.append({"match": {field: value}}) # Bypass the check for query and tags membership since we can search by msg, etc search_query = self.parse_query( data, tags, None, False, add_bool, bypass_check=True ) if rule_object.window is not None: range_query = { "range": { "ts": { "gte": f"now-{rule_object.window}/d", "lte": "now/d", } } } add_top.append(range_query) self.add_bool(search_query, add_bool) self.add_top(search_query, add_top) if "sentiment" in data: search_query["aggs"] = { "avg_sentiment": { "avg": {"field": "sentiment"}, } } return search_query def schedule_check_aggregations(self, rule_object, result_map): """ Check the results of a scheduled query for aggregations. """ for index, (meta, result) in result_map.items(): # Default to true, if no aggs are found, we still want to match match = True for agg_name, (operator, number) in rule_object.aggs.items(): if agg_name in meta: agg_value = meta["aggs"][agg_name]["value"] # TODO: simplify this, match is default to True if operator == ">": if agg_value > number: match = True else: match = False elif operator == "<": if agg_value < number: match = True else: match = False elif operator == "=": if agg_value == number: match = True else: match = False else: match = False else: # No aggregation found, but it is required match = False result_map[index][0]["aggs"][agg_name]["match"] = match return result_map def schedule_query_results_test_sync(self, rule_object): """ Helper to run a scheduled query test with reduced functionality. Sync version for running from Django forms. Does not return results. """ data = rule_object.parsed search_query = self.prepare_schedule_query(rule_object) for index in data["index"]: if "message" in search_query: self.log.error(f"Error parsing test query: {search_query['message']}") continue response = self.run_query( rule_object.user, search_query, index=index, ) self.log.debug(f"Running scheduled test query on {index}: {search_query}") # self.log.debug(f"Response from scheduled query: {response}") if isinstance(response, Exception): error = response.info["error"]["root_cause"][0]["reason"] self.log.error(f"Error running test scheduled search: {error}") raise QueryError(error) async def schedule_query_results(self, rule_object): """ Helper to run a scheduled query with reduced functionality and async. """ result_map = {} data = rule_object.parsed search_query = self.prepare_schedule_query(rule_object) for index in data["index"]: if "message" in search_query: self.log.error(f"Error parsing query: {search_query['message']}") continue response = await self.async_run_query( rule_object.user, search_query, index=index, ) self.log.debug(f"Running scheduled query on {index}: {search_query}") # self.log.debug(f"Response from scheduled query: {response}") if isinstance(response, Exception): error = response.info["error"]["root_cause"][0]["reason"] self.log.error(f"Error running scheduled search: {error}") raise QueryError(error) if len(response["hits"]["hits"]) == 0: # No results, skip continue meta, response = self.parse(response, meta=True) # print("Parsed response", response) if "message" in response: self.log.error(f"Error running scheduled search: {response['message']}") continue result_map[index] = (meta, response) # Average aggregation check # Could probably do this in elasticsearch result_map = self.schedule_check_aggregations(rule_object, result_map) return result_map def query_results( self, request, query_params, size=None, annotate=True, custom_query=False, reverse=False, dedup=False, dedup_fields=None, tags=None, ): add_bool = [] add_top = [] add_top_negative = [] add_defaults(query_params) # Now, run the helpers for SIQTSRSS/ADR # S - Size # I - Index # Q - Query # T - Tags # S - Source # R - Ranges # S - Sort # S - Sentiment # A - Annotate # D - Dedup # R - Reverse # S - Size if request.user.is_anonymous: sizes = settings.MAIN_SIZES_ANON else: sizes = settings.MAIN_SIZES if not size: size = parse_size(query_params, sizes) if isinstance(size, dict): return size rule_object = parse_rule(request.user, query_params) if isinstance(rule_object, dict): return rule_object if rule_object is not None: index = settings.INDEX_RULE_STORAGE add_bool.append({"rule_uuid": str(rule_object.id)}) else: # I - Index index = parse_index(request.user, query_params) if isinstance(index, dict): return index # Q/T - Query/Tags search_query = self.parse_query( query_params, tags, size, custom_query, add_bool ) # Query should be a dict, so check if it contains message here if "message" in search_query: return search_query # S - Sources sources = parse_source(request.user, query_params) if isinstance(sources, dict): return sources total_count = len(sources) total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED) if total_count != total_sources: add_top_tmp = {"bool": {"should": []}} for source_iter in sources: add_top_tmp["bool"]["should"].append( {"match_phrase": {"src": source_iter}} ) add_top.append(add_top_tmp) # R - Ranges # date_query = False from_ts, to_ts = parse_date_time(query_params) if from_ts: range_query = { "range": { "ts": { "gt": from_ts, "lt": to_ts, } } } add_top.append(range_query) # S - Sort sort = parse_sort(query_params) if isinstance(sort, dict): return sort if sort: # For Druid compatibility sort_map = {"ascending": "asc", "descending": "desc"} sorting = [ { "ts": { "order": sort_map[sort], } } ] search_query["sort"] = sorting # S - Sentiment sentiment_r = parse_sentiment(query_params) if isinstance(sentiment_r, dict): return sentiment_r if sentiment_r: sentiment_method, sentiment = sentiment_r range_query_compare = {"range": {"sentiment": {}}} range_query_precise = { "match": { "sentiment": None, } } if sentiment_method == "below": range_query_compare["range"]["sentiment"]["lt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "above": range_query_compare["range"]["sentiment"]["gt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "exact": range_query_precise["match"]["sentiment"] = sentiment add_top.append(range_query_precise) elif sentiment_method == "nonzero": range_query_precise["match"]["sentiment"] = 0 add_top_negative.append(range_query_precise) # Add in the additional information we already populated self.add_bool(search_query, add_bool) self.add_top(search_query, add_top) self.add_top(search_query, add_top_negative, negative=True) response = self.query( request.user, search_query, index=index, ) if "message" in response: return response # A/D/R - Annotate/Dedup/Reverse response["object_list"] = self.process_results( response["object_list"], annotate=annotate, dedup=dedup, dedup_fields=dedup_fields, reverse=reverse, ) context = response return context def query_single_result(self, request, query_params): context = self.query_results(request, query_params, size=100) if not context: return {"message": "Failed to run query", "message_class": "danger"} if "message" in context: return context dedup_set = {item["nick"] for item in context["object_list"]} if dedup_set: context["item"] = context["object_list"][0] return context def add_bool(self, search_query, add_bool): """ Add the specified boolean matches to search query. """ if not add_bool: return for item in add_bool: search_query["query"]["bool"]["must"].append({"match_phrase": item}) def add_top(self, search_query, add_top, negative=False): """ Merge add_top with the base of the search_query. """ if not add_top: return if negative: for item in add_top: if "must_not" in search_query["query"]["bool"]: search_query["query"]["bool"]["must_not"].append(item) else: search_query["query"]["bool"]["must_not"] = [item] else: for item in add_top: if "query" not in search_query: search_query["query"] = {"bool": {"must": []}} search_query["query"]["bool"]["must"].append(item)