neptune/core/db/elastic.py

693 lines
24 KiB
Python
Raw Normal View History

2022-09-27 14:15:08 +00:00
# from copy import deepcopy
# from datetime import datetime, timedelta
from django.conf import settings
from elasticsearch import AsyncElasticsearch, Elasticsearch
2022-11-21 07:20:29 +00:00
from elasticsearch.exceptions import NotFoundError, RequestError
2022-09-27 14:15:08 +00:00
from core.db import StorageBackend, add_defaults
2022-09-27 14:15:08 +00:00
# from json import dumps
# pp = lambda x: print(dumps(x, indent=2))
2022-11-23 18:15:42 +00:00
from core.db.processing import parse_results
from core.lib.parsing import (
QueryError,
parse_date_time,
parse_index,
2023-02-02 20:41:19 +00:00
parse_rule,
parse_sentiment,
parse_size,
parse_sort,
parse_source,
)
2022-09-27 14:15:08 +00:00
2023-02-02 20:04:55 +00:00
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"match_ts": {"type": "date", "format": "iso8601"},
"file_tim": {"type": "date", "format": "epoch_millis"},
"rule_id": {"type": "keyword"},
2023-02-02 20:04:55 +00:00
}
}
}
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
2022-09-27 14:15:08 +00:00
class ElasticsearchBackend(StorageBackend):
2022-09-27 14:15:08 +00:00
def __init__(self):
super().__init__("elasticsearch")
self.client = None
self.async_client = None
2022-09-27 14:15:08 +00:00
def initialise(self, **kwargs):
"""
Inititialise the Elasticsearch API endpoint.
2022-09-27 14:15:08 +00:00
"""
2022-11-21 07:20:29 +00:00
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = Elasticsearch(
2022-11-23 18:15:42 +00:00
settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False
2022-09-27 14:15:08 +00:00
)
self.client = client
async def async_initialise(self, **kwargs):
"""
Inititialise the Elasticsearch API endpoint in async mode.
"""
2023-02-02 20:04:55 +00:00
global mapping
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(
settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False
)
self.async_client = client
2023-02-02 20:04:55 +00:00
# Create the rule storage indices
if await client.indices.exists(index=settings.INDEX_RULE_STORAGE):
await client.indices.put_mapping(
index=settings.INDEX_RULE_STORAGE,
properties=mapping["mappings"]["properties"],
)
else:
await client.indices.create(
index=settings.INDEX_RULE_STORAGE, mappings=mapping["mappings"]
)
2023-02-09 21:17:50 +00:00
def delete_rule_entries(self, rule_id):
"""
Delete all entries for a given rule.
:param rule_id: The rule ID to delete.
"""
if self.client is None:
self.initialise()
search_query = self.construct_query(None, None, blank=True)
search_query["query"]["bool"]["must"].append(
{"match_phrase": {"rule_id": rule_id}}
)
return self.client.delete_by_query(
index=settings.INDEX_RULE_STORAGE, body=search_query
)
2022-11-23 18:15:42 +00:00
def construct_context_query(
self, index, net, channel, src, num, size, type=None, nicks=None
):
# Get the initial query
query = self.construct_query(None, size, blank=True)
extra_must = []
extra_should = []
extra_should2 = []
if num:
extra_must.append({"match_phrase": {"num": num}})
2022-11-23 18:15:42 +00:00
if net:
extra_must.append({"match_phrase": {"net": net}})
if channel:
extra_must.append({"match": {"channel": channel}})
if nicks:
for nick in nicks:
extra_should2.append({"match": {"nick": nick}})
types = ["msg", "notice", "action", "kick", "topic", "mode"]
fields = [
"nick",
"ident",
"host",
"channel",
"ts",
"msg",
"type",
"net",
"src",
"tokens",
]
query["fields"] = fields
if index == "internal":
fields.append("mtype")
if channel == "*status" or type == "znc":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"match": {"channel": channel}})
extra_should2 = []
# Type is one of msg or notice
# extra_should.append({"match": {"mtype": "msg"}})
# extra_should.append({"match": {"mtype": "notice"}})
extra_should.append({"match": {"type": "znc"}})
extra_should.append({"match": {"type": "self"}})
extra_should2.append({"match": {"type": "znc"}})
extra_should2.append({"match": {"nick": channel}})
elif type == "auth":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"match": {"channel": channel}})
extra_should2 = []
extra_should2.append({"match": {"nick": channel}})
# extra_should2.append({"match": {"mtype": "msg"}})
# extra_should2.append({"match": {"mtype": "notice"}})
extra_should.append({"match": {"type": "query"}})
extra_should2.append({"match": {"type": "self"}})
extra_should.append({"match": {"nick": channel}})
else:
for ctype in types:
2023-02-01 07:20:31 +00:00
extra_should.append({"match": {"mtype": ctype}})
2022-11-23 18:15:42 +00:00
else:
for ctype in types:
extra_should.append({"match": {"type": ctype}})
# query = {
# "index": index,
# "limit": size,
# "query": {
# "bool": {
# "must": [
# # {"equals": {"src": src}},
# # {
# # "bool": {
# # "should": [*extra_should],
# # }
# # },
# # {
# # "bool": {
# # "should": [*extra_should2],
# # }
# # },
# *extra_must,
# ]
# }
# },
# "fields": fields,
# # "_source": False,
# }
if extra_must:
for x in extra_must:
query["query"]["bool"]["must"].append(x)
if extra_should:
query["query"]["bool"]["must"].append({"bool": {"should": [*extra_should]}})
if extra_should2:
query["query"]["bool"]["must"].append(
{"bool": {"should": [*extra_should2]}}
)
return query
2023-01-15 23:02:13 +00:00
def construct_query(self, query, size=None, blank=False, **kwargs):
2022-09-27 14:15:08 +00:00
"""
2022-11-21 07:20:29 +00:00
Accept some query parameters and construct an Elasticsearch query.
2022-09-27 14:15:08 +00:00
"""
query_base = {
# "size": size,
2022-09-27 14:15:08 +00:00
"query": {"bool": {"must": []}},
}
if size:
query_base["size"] = size
2022-09-27 14:15:08 +00:00
query_string = {
"query_string": {
"query": query,
# "fields": fields,
# "default_field": "msg",
# "type": "best_fields",
"fuzziness": "AUTO",
"fuzzy_transpositions": True,
"fuzzy_max_expansions": 50,
"fuzzy_prefix_length": 0,
# "minimum_should_match": 1,
"default_operator": "and",
2022-09-27 14:15:08 +00:00
"analyzer": "standard",
"lenient": True,
"boost": 1,
"allow_leading_wildcard": True,
# "enable_position_increments": False,
"phrase_slop": 3,
# "max_determinized_states": 10000,
"quote_field_suffix": "",
"quote_analyzer": "standard",
"analyze_wildcard": False,
"auto_generate_synonyms_phrase_query": True,
}
}
2022-11-23 18:15:42 +00:00
if not blank:
2022-09-27 14:15:08 +00:00
query_base["query"]["bool"]["must"].append(query_string)
return query_base
def parse(self, response, **kwargs):
parsed = parse_results(response, **kwargs)
2022-11-23 18:15:42 +00:00
return parsed
def run_query(self, user, search_query, **kwargs):
2022-09-27 14:15:08 +00:00
"""
Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can
avoid filtering for superusers.
Accept fields and size, for the fields we want to match and the
number of results to return.
"""
if self.client is None:
self.initialise()
2022-11-23 18:15:42 +00:00
index = kwargs.get("index")
2022-09-27 14:15:08 +00:00
try:
2022-11-23 18:15:42 +00:00
response = self.client.search(body=search_query, index=index)
2022-09-27 14:15:08 +00:00
except RequestError as err:
2022-11-21 07:20:29 +00:00
print("Elasticsearch error", err)
2022-09-27 14:15:08 +00:00
return err
except NotFoundError as err:
2022-11-21 07:20:29 +00:00
print("Elasticsearch error", err)
2022-09-27 14:15:08 +00:00
return err
return response
async def async_run_query(self, user, search_query, **kwargs):
"""
Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can
avoid filtering for superusers.
Accept fields and size, for the fields we want to match and the
number of results to return.
"""
if self.async_client is None:
await self.async_initialise()
index = kwargs.get("index")
try:
response = await self.async_client.search(body=search_query, index=index)
except RequestError as err:
print("Elasticsearch error", err)
return err
except NotFoundError as err:
print("Elasticsearch error", err)
return err
return response
2023-02-02 20:04:55 +00:00
async def async_store_matches(self, matches):
"""
Store a list of matches in Elasticsearch.
:param index: The index to store the matches in.
:param matches: A list of matches to store.
"""
if self.async_client is None:
await self.async_initialise()
for match in matches:
result = await self.async_client.index(
index=settings.INDEX_RULE_STORAGE, body=match
)
if not result["result"] == "created":
self.log.error(f"Indexing failed: {result}")
self.log.debug(f"Indexed {len(matches)} messages in ES")
def store_matches(self, matches):
"""
Store a list of matches in Elasticsearch.
:param index: The index to store the matches in.
:param matches: A list of matches to store.
"""
if self.client is None:
self.initialise()
for match in matches:
result = self.client.index(index=settings.INDEX_RULE_STORAGE, body=match)
if not result["result"] == "created":
self.log.error(f"Indexing failed: {result}")
self.log.debug(f"Indexed {len(matches)} messages in ES")
2023-02-09 07:20:35 +00:00
def prepare_schedule_query(self, rule_object):
"""
2023-02-09 07:20:35 +00:00
Helper to run a scheduled query with reduced functionality.
"""
data = rule_object.parsed
if "tags" in data:
tags = data["tags"]
else:
tags = []
if "query" in data:
query = data["query"][0]
data["query"] = query
add_bool = []
add_top = []
if "source" in data:
total_count = len(data["source"])
total_sources = len(settings.MAIN_SOURCES) + len(
settings.SOURCES_RESTRICTED
)
if total_count != total_sources:
add_top_tmp = {"bool": {"should": []}}
for source_iter in data["source"]:
add_top_tmp["bool"]["should"].append(
{"match_phrase": {"src": source_iter}}
)
add_top.append(add_top_tmp)
if "tokens" in data:
add_top_tmp = {"bool": {"should": []}}
for token in data["tokens"]:
add_top_tmp["bool"]["should"].append(
{"match_phrase": {"tokens": token}}
)
add_top.append(add_top_tmp)
for field, values in data.items():
if field not in ["source", "index", "tags", "query", "sentiment", "tokens"]:
for value in values:
add_top.append({"match": {field: value}})
2023-01-15 23:02:13 +00:00
# Bypass the check for query and tags membership since we can search by msg, etc
search_query = self.parse_query(
data, tags, None, False, add_bool, bypass_check=True
)
2023-01-16 01:17:19 +00:00
if rule_object.window is not None:
range_query = {
"range": {
"ts": {
2023-02-10 22:52:59 +00:00
"gte": f"now-{rule_object.window}",
"lte": "now",
2023-01-16 01:17:19 +00:00
}
}
}
add_top.append(range_query)
self.add_bool(search_query, add_bool)
self.add_top(search_query, add_top)
# if "sentiment" in data:
search_query["aggs"] = {
"avg_sentiment": {
"avg": {"field": "sentiment"},
}
}
2023-02-09 07:20:35 +00:00
return search_query
2023-02-09 07:20:35 +00:00
def schedule_check_aggregations(self, rule_object, result_map):
"""
Check the results of a scheduled query for aggregations.
"""
2023-02-09 22:54:38 +00:00
if rule_object.aggs is None:
return result_map
for index, (meta, result) in result_map.items():
# Default to true, if no aggs are found, we still want to match
match = True
for agg_name, (operator, number) in rule_object.aggs.items():
if agg_name in meta["aggs"]:
agg_value = meta["aggs"][agg_name]["value"]
# TODO: simplify this, match is default to True
if operator == ">":
if agg_value > number:
match = True
else:
match = False
elif operator == "<":
if agg_value < number:
match = True
else:
match = False
elif operator == "=":
if agg_value == number:
match = True
else:
match = False
else:
match = False
else:
# No aggregation found, but it is required
match = False
result_map[index][0]["aggs"][agg_name]["match"] = match
return result_map
2023-02-09 07:20:35 +00:00
def schedule_query_results_test_sync(self, rule_object):
"""
Helper to run a scheduled query test with reduced functionality.
Sync version for running from Django forms.
Does not return results.
"""
data = rule_object.parsed
search_query = self.prepare_schedule_query(rule_object)
for index in data["index"]:
if "message" in search_query:
self.log.error(f"Error parsing test query: {search_query['message']}")
continue
response = self.run_query(
rule_object.user,
search_query,
index=index,
)
self.log.debug(f"Running scheduled test query on {index}: {search_query}")
# self.log.debug(f"Response from scheduled query: {response}")
if isinstance(response, Exception):
error = response.info["error"]["root_cause"][0]["reason"]
self.log.error(f"Error running test scheduled search: {error}")
raise QueryError(error)
async def schedule_query_results(self, rule_object):
"""
Helper to run a scheduled query with reduced functionality and async.
"""
result_map = {}
data = rule_object.parsed
search_query = self.prepare_schedule_query(rule_object)
for index in data["index"]:
if "message" in search_query:
self.log.error(f"Error parsing query: {search_query['message']}")
continue
response = await self.async_run_query(
rule_object.user,
search_query,
index=index,
)
self.log.debug(f"Running scheduled query on {index}: {search_query}")
# self.log.debug(f"Response from scheduled query: {response}")
if isinstance(response, Exception):
error = response.info["error"]["root_cause"][0]["reason"]
self.log.error(f"Error running scheduled search: {error}")
raise QueryError(error)
if len(response["hits"]["hits"]) == 0:
# No results, skip
result_map[index] = ({}, [])
2023-02-09 07:20:35 +00:00
continue
meta, response = self.parse(response, meta=True)
# print("Parsed response", response)
if "message" in response:
self.log.error(f"Error running scheduled search: {response['message']}")
continue
result_map[index] = (meta, response)
# Average aggregation check
# Could probably do this in elasticsearch
result_map = self.schedule_check_aggregations(rule_object, result_map)
return result_map
2022-09-27 14:15:08 +00:00
def query_results(
self,
request,
query_params,
size=None,
annotate=True,
custom_query=False,
reverse=False,
dedup=False,
dedup_fields=None,
tags=None,
):
add_bool = []
add_top = []
add_top_negative = []
add_defaults(query_params)
# Now, run the helpers for SIQTSRSS/ADR
# S - Size
# I - Index
# Q - Query
# T - Tags
# S - Source
# R - Ranges
# S - Sort
# S - Sentiment
# A - Annotate
# D - Dedup
# R - Reverse
# S - Size
2022-09-27 14:15:08 +00:00
if request.user.is_anonymous:
sizes = settings.MAIN_SIZES_ANON
else:
sizes = settings.MAIN_SIZES
if not size:
size = parse_size(query_params, sizes)
if isinstance(size, dict):
return size
2023-02-02 20:41:19 +00:00
rule_object = parse_rule(request.user, query_params)
if isinstance(rule_object, dict):
return rule_object
if rule_object is not None:
index = settings.INDEX_RULE_STORAGE
add_bool.append({"rule_id": str(rule_object.id)})
2023-02-02 20:41:19 +00:00
else:
# I - Index
index = parse_index(request.user, query_params)
if isinstance(index, dict):
return index
# Q/T - Query/Tags
2022-11-23 18:15:42 +00:00
search_query = self.parse_query(
query_params, tags, size, custom_query, add_bool
)
# Query should be a dict, so check if it contains message here
if "message" in search_query:
return search_query
# S - Sources
sources = parse_source(request.user, query_params)
2022-11-23 18:15:42 +00:00
if isinstance(sources, dict):
return sources
total_count = len(sources)
# Total is -1 due to the "all" source
total_sources = (
len(settings.MAIN_SOURCES) - 1 + len(settings.SOURCES_RESTRICTED)
)
2023-02-10 07:20:36 +00:00
# If the sources the user has access to are equal to all
# possible sources, then we don't need to add the source
# filter to the query.
2022-11-23 18:15:42 +00:00
if total_count != total_sources:
add_top_tmp = {"bool": {"should": []}}
for source_iter in sources:
add_top_tmp["bool"]["should"].append(
{"match_phrase": {"src": source_iter}}
)
if query_params["source"] != "all":
add_top.append(add_top_tmp)
2022-09-27 14:15:08 +00:00
# R - Ranges
2022-09-27 14:15:08 +00:00
# date_query = False
from_ts, to_ts = parse_date_time(query_params)
2022-11-23 18:15:42 +00:00
if from_ts:
2022-09-27 14:15:08 +00:00
range_query = {
"range": {
"ts": {
"gt": from_ts,
"lt": to_ts,
}
}
}
add_top.append(range_query)
# S - Sort
sort = parse_sort(query_params)
2022-11-23 18:15:42 +00:00
if isinstance(sort, dict):
return sort
if rule_object is not None:
field = "match_ts"
else:
field = "ts"
2022-11-23 18:15:42 +00:00
if sort:
# For Druid compatibility
sort_map = {"ascending": "asc", "descending": "desc"}
sorting = [
{
field: {
2022-11-23 18:15:42 +00:00
"order": sort_map[sort],
2022-09-27 14:15:08 +00:00
}
2022-11-23 18:15:42 +00:00
}
]
search_query["sort"] = sorting
2022-09-27 14:15:08 +00:00
# S - Sentiment
sentiment_r = parse_sentiment(query_params)
2022-11-23 18:15:42 +00:00
if isinstance(sentiment_r, dict):
return sentiment_r
if sentiment_r:
if rule_object is not None:
sentiment_index = "meta.aggs.avg_sentiment.value"
else:
sentiment_index = "sentiment"
2022-11-23 18:15:42 +00:00
sentiment_method, sentiment = sentiment_r
range_query_compare = {"range": {sentiment_index: {}}}
2022-09-27 14:15:08 +00:00
range_query_precise = {
"match": {
sentiment_index: None,
2022-09-27 14:15:08 +00:00
}
}
if sentiment_method == "below":
range_query_compare["range"][sentiment_index]["lt"] = sentiment
2022-09-27 14:15:08 +00:00
add_top.append(range_query_compare)
elif sentiment_method == "above":
range_query_compare["range"][sentiment_index]["gt"] = sentiment
2022-09-27 14:15:08 +00:00
add_top.append(range_query_compare)
elif sentiment_method == "exact":
range_query_precise["match"][sentiment_index] = sentiment
2022-09-27 14:15:08 +00:00
add_top.append(range_query_precise)
elif sentiment_method == "nonzero":
range_query_precise["match"][sentiment_index] = 0
2022-09-27 14:15:08 +00:00
add_top_negative.append(range_query_precise)
2022-11-23 18:15:42 +00:00
# Add in the additional information we already populated
self.add_bool(search_query, add_bool)
self.add_top(search_query, add_top)
self.add_top(search_query, add_top_negative, negative=True)
2022-09-27 14:15:08 +00:00
2022-11-23 18:15:42 +00:00
response = self.query(
request.user,
2022-09-27 14:15:08 +00:00
search_query,
index=index,
)
if "message" in response:
return response
2022-09-27 14:15:08 +00:00
# A/D/R - Annotate/Dedup/Reverse
2022-12-09 07:20:59 +00:00
response["object_list"] = self.process_results(
response["object_list"],
2022-11-23 18:15:42 +00:00
annotate=annotate,
dedup=dedup,
dedup_fields=dedup_fields,
reverse=reverse,
)
2022-09-27 14:15:08 +00:00
2022-11-23 18:15:42 +00:00
context = response
2022-09-27 14:15:08 +00:00
return context
def query_single_result(self, request, query_params):
context = self.query_results(request, query_params, size=100)
if not context:
return {"message": "Failed to run query", "message_class": "danger"}
if "message" in context:
return context
dedup_set = {item["nick"] for item in context["object_list"]}
if dedup_set:
context["item"] = context["object_list"][0]
return context
2022-11-23 18:15:42 +00:00
def add_bool(self, search_query, add_bool):
"""
Add the specified boolean matches to search query.
"""
if not add_bool:
return
for item in add_bool:
search_query["query"]["bool"]["must"].append({"match_phrase": item})
def add_top(self, search_query, add_top, negative=False):
"""
Merge add_top with the base of the search_query.
"""
if not add_top:
return
if negative:
for item in add_top:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
else:
search_query["query"]["bool"]["must_not"] = [item]
else:
for item in add_top:
2023-01-15 23:02:13 +00:00
if "query" not in search_query:
search_query["query"] = {"bool": {"must": []}}
2022-11-23 18:15:42 +00:00
search_query["query"]["bool"]["must"].append(item)