neptune/core/db/elastic.py

524 lines
18 KiB
Python
Raw Normal View History

2022-09-27 14:15:08 +00:00
# from copy import deepcopy
# from datetime import datetime, timedelta
from django.conf import settings
2022-11-21 07:20:29 +00:00
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError, RequestError
2022-09-27 14:15:08 +00:00
from core.db import StorageBackend, add_defaults
2022-09-27 14:15:08 +00:00
# from json import dumps
# pp = lambda x: print(dumps(x, indent=2))
2022-11-23 18:15:42 +00:00
from core.db.processing import parse_results
from core.lib.parsing import (
QueryError,
parse_date_time,
parse_index,
parse_sentiment,
parse_size,
parse_sort,
parse_source,
)
2022-09-27 14:15:08 +00:00
class ElasticsearchBackend(StorageBackend):
2022-09-27 14:15:08 +00:00
def __init__(self):
super().__init__("Elasticsearch")
2022-09-27 14:15:08 +00:00
def initialise(self, **kwargs):
"""
2022-11-21 07:20:29 +00:00
Inititialise the Elastuicsearch API endpoint.
2022-09-27 14:15:08 +00:00
"""
2022-11-21 07:20:29 +00:00
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = Elasticsearch(
2022-11-23 18:15:42 +00:00
settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False
2022-09-27 14:15:08 +00:00
)
self.client = client
2022-11-23 18:15:42 +00:00
def construct_context_query(
self, index, net, channel, src, num, size, type=None, nicks=None
):
# Get the initial query
query = self.construct_query(None, size, blank=True)
extra_must = []
extra_should = []
extra_should2 = []
if num:
extra_must.append({"match_phrase": {"num": num}})
2022-11-23 18:15:42 +00:00
if net:
extra_must.append({"match_phrase": {"net": net}})
if channel:
extra_must.append({"match": {"channel": channel}})
if nicks:
for nick in nicks:
extra_should2.append({"match": {"nick": nick}})
types = ["msg", "notice", "action", "kick", "topic", "mode"]
fields = [
"nick",
"ident",
"host",
"channel",
"ts",
"msg",
"type",
"net",
"src",
"tokens",
]
query["fields"] = fields
if index == "internal":
fields.append("mtype")
if channel == "*status" or type == "znc":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"match": {"channel": channel}})
extra_should2 = []
# Type is one of msg or notice
# extra_should.append({"match": {"mtype": "msg"}})
# extra_should.append({"match": {"mtype": "notice"}})
extra_should.append({"match": {"type": "znc"}})
extra_should.append({"match": {"type": "self"}})
extra_should2.append({"match": {"type": "znc"}})
extra_should2.append({"match": {"nick": channel}})
elif type == "auth":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"match": {"channel": channel}})
extra_should2 = []
extra_should2.append({"match": {"nick": channel}})
# extra_should2.append({"match": {"mtype": "msg"}})
# extra_should2.append({"match": {"mtype": "notice"}})
extra_should.append({"match": {"type": "query"}})
extra_should2.append({"match": {"type": "self"}})
extra_should.append({"match": {"nick": channel}})
else:
for ctype in types:
extra_should.append({"equals": {"mtype": ctype}})
else:
for ctype in types:
extra_should.append({"match": {"type": ctype}})
# query = {
# "index": index,
# "limit": size,
# "query": {
# "bool": {
# "must": [
# # {"equals": {"src": src}},
# # {
# # "bool": {
# # "should": [*extra_should],
# # }
# # },
# # {
# # "bool": {
# # "should": [*extra_should2],
# # }
# # },
# *extra_must,
# ]
# }
# },
# "fields": fields,
# # "_source": False,
# }
if extra_must:
for x in extra_must:
query["query"]["bool"]["must"].append(x)
if extra_should:
query["query"]["bool"]["must"].append({"bool": {"should": [*extra_should]}})
if extra_should2:
query["query"]["bool"]["must"].append(
{"bool": {"should": [*extra_should2]}}
)
return query
2023-01-15 23:02:13 +00:00
def construct_query(self, query, size=None, blank=False, **kwargs):
2022-09-27 14:15:08 +00:00
"""
2022-11-21 07:20:29 +00:00
Accept some query parameters and construct an Elasticsearch query.
2022-09-27 14:15:08 +00:00
"""
query_base = {
# "size": size,
2022-09-27 14:15:08 +00:00
"query": {"bool": {"must": []}},
}
if size:
query_base["size"] = size
2022-09-27 14:15:08 +00:00
query_string = {
"query_string": {
"query": query,
# "fields": fields,
# "default_field": "msg",
# "type": "best_fields",
"fuzziness": "AUTO",
"fuzzy_transpositions": True,
"fuzzy_max_expansions": 50,
"fuzzy_prefix_length": 0,
# "minimum_should_match": 1,
"default_operator": "and",
2022-09-27 14:15:08 +00:00
"analyzer": "standard",
"lenient": True,
"boost": 1,
"allow_leading_wildcard": True,
# "enable_position_increments": False,
"phrase_slop": 3,
# "max_determinized_states": 10000,
"quote_field_suffix": "",
"quote_analyzer": "standard",
"analyze_wildcard": False,
"auto_generate_synonyms_phrase_query": True,
}
}
2022-11-23 18:15:42 +00:00
if not blank:
2022-09-27 14:15:08 +00:00
query_base["query"]["bool"]["must"].append(query_string)
return query_base
def parse(self, response, **kwargs):
parsed = parse_results(response, **kwargs)
2022-11-23 18:15:42 +00:00
return parsed
def run_query(self, user, search_query, **kwargs):
2022-09-27 14:15:08 +00:00
"""
Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can
avoid filtering for superusers.
Accept fields and size, for the fields we want to match and the
number of results to return.
"""
2022-11-23 18:15:42 +00:00
index = kwargs.get("index")
2022-09-27 14:15:08 +00:00
try:
2022-11-23 18:15:42 +00:00
response = self.client.search(body=search_query, index=index)
2022-09-27 14:15:08 +00:00
except RequestError as err:
2022-11-21 07:20:29 +00:00
print("Elasticsearch error", err)
2022-09-27 14:15:08 +00:00
return err
except NotFoundError as err:
2022-11-21 07:20:29 +00:00
print("Elasticsearch error", err)
2022-09-27 14:15:08 +00:00
return err
return response
async def async_run_query(self, user, search_query, **kwargs):
"""
Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can
avoid filtering for superusers.
Accept fields and size, for the fields we want to match and the
number of results to return.
"""
index = kwargs.get("index")
try:
response = self.client.search(body=search_query, index=index)
except RequestError as err:
print("Elasticsearch error", err)
return err
except NotFoundError as err:
print("Elasticsearch error", err)
return err
return response
async def schedule_query_results(self, rule_object):
"""
Helper to run a scheduled query with reduced functionality and async.
"""
data = rule_object.parsed
if "tags" in data:
tags = data["tags"]
else:
tags = []
if "query" in data:
query = data["query"][0]
data["query"] = query
result_map = {}
add_bool = []
add_top = []
if "source" in data:
total_count = len(data["source"])
total_sources = len(settings.MAIN_SOURCES) + len(
settings.SOURCES_RESTRICTED
)
if total_count != total_sources:
add_top_tmp = {"bool": {"should": []}}
for source_iter in data["source"]:
add_top_tmp["bool"]["should"].append(
{"match_phrase": {"src": source_iter}}
)
add_top.append(add_top_tmp)
for field, values in data.items():
if field not in ["source", "index", "tags", "query", "sentiment"]:
for value in values:
add_top.append({"match": {field: value}})
2023-01-15 23:02:13 +00:00
# Bypass the check for query and tags membership since we can search by msg, etc
search_query = self.parse_query(
data, tags, None, False, add_bool, bypass_check=True
)
2023-01-16 01:17:19 +00:00
if rule_object.window is not None:
range_query = {
"range": {
"ts": {
2023-01-16 07:20:37 +00:00
"gte": f"now-{rule_object.window}/d",
"lte": "now/d",
2023-01-16 01:17:19 +00:00
}
}
}
add_top.append(range_query)
self.add_bool(search_query, add_bool)
self.add_top(search_query, add_top)
if "sentiment" in data:
search_query["aggs"] = {
"avg_sentiment": {
"avg": {"field": "sentiment"},
}
}
for index in data["index"]:
if "message" in search_query:
self.log.error(f"Error parsing query: {search_query['message']}")
continue
response = await self.async_run_query(
rule_object.user,
search_query,
index=index,
)
2023-01-16 07:20:37 +00:00
self.log.debug(f"Running scheduled query on {index}: {search_query}")
self.log.debug(f"Response from scheduled query: {response}")
if isinstance(response, Exception):
error = response.info["error"]["root_cause"][0]["reason"]
self.log.error(f"Error running scheduled search: {error}")
raise QueryError(error)
if len(response["hits"]["hits"]) == 0:
# No results, skip
continue
aggs, response = self.parse(response, aggs=True)
if "message" in response:
self.log.error(f"Error running scheduled search: {response['message']}")
continue
result_map[index] = (aggs, response)
# Average aggregation check
# Could probably do this in elasticsearch
for index, (aggs, result) in result_map.items():
# Default to true, if no aggs are found, we still want to match
match = True
for agg_name, (operator, number) in rule_object.aggs.items():
if agg_name in aggs:
agg_value = aggs[agg_name]["value"]
# TODO: simplify this, match is default to True
if operator == ">":
if agg_value > number:
match = True
else:
match = False
elif operator == "<":
if agg_value < number:
match = True
else:
match = False
elif operator == "=":
if agg_value == number:
match = True
else:
match = False
else:
match = False
else:
# No aggregation found, but it is required
match = False
result_map[index][0][agg_name]["match"] = match
return result_map
2022-09-27 14:15:08 +00:00
def query_results(
self,
request,
query_params,
size=None,
annotate=True,
custom_query=False,
reverse=False,
dedup=False,
dedup_fields=None,
tags=None,
):
2022-09-27 14:15:08 +00:00
add_bool = []
add_top = []
add_top_negative = []
add_defaults(query_params)
# Now, run the helpers for SIQTSRSS/ADR
# S - Size
# I - Index
# Q - Query
# T - Tags
# S - Source
# R - Ranges
# S - Sort
# S - Sentiment
# A - Annotate
# D - Dedup
# R - Reverse
# S - Size
2022-09-27 14:15:08 +00:00
if request.user.is_anonymous:
sizes = settings.MAIN_SIZES_ANON
else:
sizes = settings.MAIN_SIZES
if not size:
size = parse_size(query_params, sizes)
if isinstance(size, dict):
return size
# I - Index
index = parse_index(request.user, query_params)
2022-11-23 18:15:42 +00:00
if isinstance(index, dict):
return index
# Q/T - Query/Tags
2022-11-23 18:15:42 +00:00
search_query = self.parse_query(
query_params, tags, size, custom_query, add_bool
)
# Query should be a dict, so check if it contains message here
if "message" in search_query:
return search_query
# S - Sources
sources = parse_source(request.user, query_params)
2022-11-23 18:15:42 +00:00
if isinstance(sources, dict):
return sources
total_count = len(sources)
total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED)
if total_count != total_sources:
add_top_tmp = {"bool": {"should": []}}
for source_iter in sources:
add_top_tmp["bool"]["should"].append(
{"match_phrase": {"src": source_iter}}
)
add_top.append(add_top_tmp)
2022-09-27 14:15:08 +00:00
# R - Ranges
2022-09-27 14:15:08 +00:00
# date_query = False
from_ts, to_ts = parse_date_time(query_params)
2022-11-23 18:15:42 +00:00
if from_ts:
2022-09-27 14:15:08 +00:00
range_query = {
"range": {
"ts": {
"gt": from_ts,
"lt": to_ts,
}
}
}
add_top.append(range_query)
# S - Sort
sort = parse_sort(query_params)
2022-11-23 18:15:42 +00:00
if isinstance(sort, dict):
return sort
if sort:
# For Druid compatibility
sort_map = {"ascending": "asc", "descending": "desc"}
sorting = [
{
"ts": {
"order": sort_map[sort],
2022-09-27 14:15:08 +00:00
}
2022-11-23 18:15:42 +00:00
}
]
search_query["sort"] = sorting
2022-09-27 14:15:08 +00:00
# S - Sentiment
sentiment_r = parse_sentiment(query_params)
2022-11-23 18:15:42 +00:00
if isinstance(sentiment_r, dict):
return sentiment_r
if sentiment_r:
sentiment_method, sentiment = sentiment_r
2022-09-27 14:15:08 +00:00
range_query_compare = {"range": {"sentiment": {}}}
range_query_precise = {
"match": {
"sentiment": None,
}
}
if sentiment_method == "below":
range_query_compare["range"]["sentiment"]["lt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "above":
range_query_compare["range"]["sentiment"]["gt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "exact":
range_query_precise["match"]["sentiment"] = sentiment
add_top.append(range_query_precise)
elif sentiment_method == "nonzero":
range_query_precise["match"]["sentiment"] = 0
add_top_negative.append(range_query_precise)
2022-11-23 18:15:42 +00:00
# Add in the additional information we already populated
self.add_bool(search_query, add_bool)
self.add_top(search_query, add_top)
self.add_top(search_query, add_top_negative, negative=True)
2022-09-27 14:15:08 +00:00
2022-11-23 18:15:42 +00:00
response = self.query(
request.user,
2022-09-27 14:15:08 +00:00
search_query,
index=index,
)
if "message" in response:
return response
2022-09-27 14:15:08 +00:00
# A/D/R - Annotate/Dedup/Reverse
2022-12-09 07:20:59 +00:00
response["object_list"] = self.process_results(
response["object_list"],
2022-11-23 18:15:42 +00:00
annotate=annotate,
dedup=dedup,
dedup_fields=dedup_fields,
reverse=reverse,
)
2022-09-27 14:15:08 +00:00
2022-11-23 18:15:42 +00:00
context = response
2022-09-27 14:15:08 +00:00
return context
def query_single_result(self, request, query_params):
context = self.query_results(request, query_params, size=100)
if not context:
return {"message": "Failed to run query", "message_class": "danger"}
if "message" in context:
return context
dedup_set = {item["nick"] for item in context["object_list"]}
if dedup_set:
context["item"] = context["object_list"][0]
return context
2022-11-23 18:15:42 +00:00
def add_bool(self, search_query, add_bool):
"""
Add the specified boolean matches to search query.
"""
if not add_bool:
return
for item in add_bool:
search_query["query"]["bool"]["must"].append({"match_phrase": item})
def add_top(self, search_query, add_top, negative=False):
"""
Merge add_top with the base of the search_query.
"""
if not add_top:
return
if negative:
for item in add_top:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
else:
search_query["query"]["bool"]["must_not"] = [item]
else:
for item in add_top:
2023-01-15 23:02:13 +00:00
if "query" not in search_query:
search_query["query"] = {"bool": {"must": []}}
2022-11-23 18:15:42 +00:00
search_query["query"]["bool"]["must"].append(item)