Finish reimplementing Elasticsearch

This commit is contained in:
Mark Veidemanis 2022-11-23 18:15:42 +00:00
parent 7008c365a6
commit 0fd004ca7d
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
5 changed files with 218 additions and 324 deletions

View File

@ -2,7 +2,7 @@ import stripe
from django.conf import settings from django.conf import settings
from redis import StrictRedis from redis import StrictRedis
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0)
if settings.STRIPE_TEST: if settings.STRIPE_TEST:
stripe.api_key = settings.STRIPE_API_KEY_TEST stripe.api_key = settings.STRIPE_API_KEY_TEST

View File

@ -97,22 +97,22 @@ class StorageBackend(ABC):
index = settings.INDEX_MAIN index = settings.INDEX_MAIN
return index return index
def parse_query(self, query_params, tags, size, index, custom_query, add_bool): def parse_query(self, query_params, tags, size, custom_query, add_bool, **kwargs):
query_created = False query_created = False
if "query" in query_params: if "query" in query_params:
query = query_params["query"] query = query_params["query"]
search_query = self.construct_query(query, size, index) search_query = self.construct_query(query, size, **kwargs)
query_created = True query_created = True
else: else:
if custom_query: if custom_query:
search_query = custom_query search_query = custom_query
else: else:
search_query = self.construct_query(None, size, index, blank=True) search_query = self.construct_query(None, size, blank=True, **kwargs)
if tags: if tags:
# Get a blank search query # Get a blank search query
if not query_created: if not query_created:
search_query = self.construct_query(None, size, index, blank=True) search_query = self.construct_query(None, size, blank=True, **kwargs)
query_created = True query_created = True
for item in tags: for item in tags:
for tagname, tagvalue in item.items(): for tagname, tagvalue in item.items():
@ -232,9 +232,7 @@ class StorageBackend(ABC):
if blacklisted_type in item[data_index].keys(): if blacklisted_type in item[data_index].keys():
content = item[data_index][blacklisted_type] content = item[data_index][blacklisted_type]
# For every item in the blacklisted array for the type # For every item in the blacklisted array for the type
for blacklisted_item in settings.ELASTICSEARCH_BLACKLISTED[ for blacklisted_item in settings.BLACKLISTED[blacklisted_type]:
blacklisted_type
]:
if blacklisted_item == str(content): if blacklisted_item == str(content):
# Remove the item # Remove the item
if item in response["hits"]["hits"]: if item in response["hits"]["hits"]:
@ -259,7 +257,7 @@ class StorageBackend(ABC):
# Actually get rid of all the things we set to None # Actually get rid of all the things we set to None
response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit] response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit]
def query(self, user, search_query): def query(self, user, search_query, **kwargs):
# For time tracking # For time tracking
start = time.process_time() start = time.process_time()
if settings.CACHE: if settings.CACHE:
@ -281,7 +279,19 @@ class StorageBackend(ABC):
"took": time_took_rounded, "took": time_took_rounded,
"cache": True, "cache": True,
} }
response = self.run_query(user, search_query) response = self.run_query(user, search_query, **kwargs)
# For Elasticsearch
if isinstance(response, Exception):
message = f"Error: {response.info['error']['root_cause'][0]['type']}"
message_class = "danger"
return {"message": message, "class": message_class}
if len(response["hits"]["hits"]) == 0:
message = "No results."
message_class = "danger"
return {"message": message, "class": message_class}
# For Druid
if "error" in response: if "error" in response:
if "errorMessage" in response: if "errorMessage" in response:
context = { context = {
@ -296,7 +306,9 @@ class StorageBackend(ABC):
if "took" in response: if "took" in response:
if response["took"] is None: if response["took"] is None:
return None return None
self.filter_blacklisted(user, response)
# Removed for now, no point given we have restricted indexes
# self.filter_blacklisted(user, response)
# Parse the response # Parse the response
response_parsed = self.parse(response) response_parsed = self.parse(response)

View File

@ -77,7 +77,8 @@ class DruidBackend(StorageBackend):
self.add_type("or", search_query, extra_should2) self.add_type("or", search_query, extra_should2)
return search_query return search_query
def construct_query(self, query, size, index, blank=False): def construct_query(self, query, size, blank=False, **kwargs):
index = kwargs.get("index")
search_query = { search_query = {
"limit": size, "limit": size,
"queryType": "scan", "queryType": "scan",
@ -107,19 +108,13 @@ class DruidBackend(StorageBackend):
def parse(self, response): def parse(self, response):
parsed = parse_druid(response) parsed = parse_druid(response)
print("PARSE LEN", len(parsed))
return parsed return parsed
def run_query(self, user, search_query): def run_query(self, user, search_query):
ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2) ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2)
ss = ss.decode() ss = ss.decode()
print(ss)
response = requests.post("http://druid:8082/druid/v2", json=search_query) response = requests.post("http://druid:8082/druid/v2", json=search_query)
response = orjson.loads(response.text) response = orjson.loads(response.text)
print("RESPONSE LEN", len(response))
# ss = orjson.dumps(response, option=orjson.OPT_INDENT_2)
# ss = ss.decode()
# print(ss)
return response return response
def filter_blacklisted(self, user, response): def filter_blacklisted(self, user, response):
@ -172,7 +167,7 @@ class DruidBackend(StorageBackend):
# Q/T - Query/Tags # Q/T - Query/Tags
search_query = self.parse_query( search_query = self.parse_query(
query_params, tags, size, index, custom_query, add_bool query_params, tags, size, custom_query, add_bool, index=index
) )
# Query should be a dict, so check if it contains message here # Query should be a dict, so check if it contains message here
if "message" in search_query: if "message" in search_query:

View File

@ -9,7 +9,7 @@ from core.db import StorageBackend
# from json import dumps # from json import dumps
# pp = lambda x: print(dumps(x, indent=2)) # pp = lambda x: print(dumps(x, indent=2))
from core.db.processing import annotate_results, parse_results from core.db.processing import parse_results
from core.views import helpers from core.views import helpers
@ -23,27 +23,114 @@ class ElasticsearchBackend(StorageBackend):
""" """
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = Elasticsearch( client = Elasticsearch(
# fmt: off settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False
hosts=[{"host": settings.ELASTICSEARCH_URL,
"port": settings.ELASTICSEARCH_PORT}],
http_compress=False, # enables gzip compression for request bodies
http_auth=auth,
# client_cert = client_cert_path,
# client_key = client_key_path,
use_ssl=settings.ELASTICSEARCH_TLS,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
# a_certs=ca_certs_path,
) )
self.client = client self.client = client
def construct_query(self, query, size, use_query_string=True, tokens=False): def construct_context_query(
self, index, net, channel, src, num, size, type=None, nicks=None
):
# Get the initial query
query = self.construct_query(None, size, blank=True)
extra_must = []
extra_should = []
extra_should2 = []
if num:
extra_must.append({"equals": {"num": num}})
if net:
extra_must.append({"match_phrase": {"net": net}})
if channel:
extra_must.append({"match": {"channel": channel}})
if nicks:
for nick in nicks:
extra_should2.append({"match": {"nick": nick}})
types = ["msg", "notice", "action", "kick", "topic", "mode"]
fields = [
"nick",
"ident",
"host",
"channel",
"ts",
"msg",
"type",
"net",
"src",
"tokens",
]
query["fields"] = fields
if index == "internal":
fields.append("mtype")
if channel == "*status" or type == "znc":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"match": {"channel": channel}})
extra_should2 = []
# Type is one of msg or notice
# extra_should.append({"match": {"mtype": "msg"}})
# extra_should.append({"match": {"mtype": "notice"}})
extra_should.append({"match": {"type": "znc"}})
extra_should.append({"match": {"type": "self"}})
extra_should2.append({"match": {"type": "znc"}})
extra_should2.append({"match": {"nick": channel}})
elif type == "auth":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"match": {"channel": channel}})
extra_should2 = []
extra_should2.append({"match": {"nick": channel}})
# extra_should2.append({"match": {"mtype": "msg"}})
# extra_should2.append({"match": {"mtype": "notice"}})
extra_should.append({"match": {"type": "query"}})
extra_should2.append({"match": {"type": "self"}})
extra_should.append({"match": {"nick": channel}})
else:
for ctype in types:
extra_should.append({"equals": {"mtype": ctype}})
else:
for ctype in types:
extra_should.append({"match": {"type": ctype}})
# query = {
# "index": index,
# "limit": size,
# "query": {
# "bool": {
# "must": [
# # {"equals": {"src": src}},
# # {
# # "bool": {
# # "should": [*extra_should],
# # }
# # },
# # {
# # "bool": {
# # "should": [*extra_should2],
# # }
# # },
# *extra_must,
# ]
# }
# },
# "fields": fields,
# # "_source": False,
# }
if extra_must:
for x in extra_must:
query["query"]["bool"]["must"].append(x)
if extra_should:
query["query"]["bool"]["must"].append({"bool": {"should": [*extra_should]}})
if extra_should2:
query["query"]["bool"]["must"].append(
{"bool": {"should": [*extra_should2]}}
)
return query
def construct_query(self, query, size, blank=False):
""" """
Accept some query parameters and construct an Elasticsearch query. Accept some query parameters and construct an Elasticsearch query.
""" """
if not size:
size = 5
query_base = { query_base = {
"size": size, "size": size,
"query": {"bool": {"must": []}}, "query": {"bool": {"must": []}},
@ -73,33 +160,15 @@ class ElasticsearchBackend(StorageBackend):
"auto_generate_synonyms_phrase_query": True, "auto_generate_synonyms_phrase_query": True,
} }
} }
query_tokens = { if not blank:
"simple_query_string": {
# "tokens": query,
"query": query,
"fields": ["tokens"],
"flags": "ALL",
"fuzzy_transpositions": True,
"fuzzy_max_expansions": 50,
"fuzzy_prefix_length": 0,
"default_operator": "and",
"analyzer": "standard",
"lenient": True,
"boost": 1,
"quote_field_suffix": "",
"analyze_wildcard": False,
"auto_generate_synonyms_phrase_query": False,
}
}
if tokens:
query_base["query"]["bool"]["must"].append(query_tokens)
# query["query"]["bool"]["must"].append(query_string)
# query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"]
elif use_query_string:
query_base["query"]["bool"]["must"].append(query_string) query_base["query"]["bool"]["must"].append(query_string)
return query_base return query_base
def run_query(self, client, user, query, custom_query=False, index=None, size=None): def parse(self, response):
parsed = parse_results(response)
return parsed
def run_query(self, user, search_query, **kwargs):
""" """
Low level helper to run an ES query. Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can Accept a user to pass it to the filter, so we can
@ -107,14 +176,9 @@ class ElasticsearchBackend(StorageBackend):
Accept fields and size, for the fields we want to match and the Accept fields and size, for the fields we want to match and the
number of results to return. number of results to return.
""" """
if not index: index = kwargs.get("index")
index = settings.INDEX_MAIN
if custom_query:
search_query = query
else:
search_query = self.construct_query(query, size)
try: try:
response = client.search(body=search_query, index=index) response = self.client.search(body=search_query, index=index)
except RequestError as err: except RequestError as err:
print("Elasticsearch error", err) print("Elasticsearch error", err)
return err return err
@ -136,14 +200,9 @@ class ElasticsearchBackend(StorageBackend):
tags=None, tags=None,
): ):
query = None
message = None
message_class = None
add_bool = [] add_bool = []
add_top = [] add_top = []
add_top_negative = [] add_top_negative = []
sort = None
query_created = False
helpers.add_defaults(query_params) helpers.add_defaults(query_params)
@ -171,106 +230,36 @@ class ElasticsearchBackend(StorageBackend):
return size return size
# I - Index # I - Index
if "index" in query_params: index = self.parse_index(request.user, query_params)
index = query_params["index"] if isinstance(index, dict):
if index == "main": return index
index = settings.INDEX_MAIN
else:
if not request.user.has_perm(f"core.index_{index}"):
message = "Not permitted to search by this index"
message_class = "danger"
return {
"message": message,
"class": message_class,
}
if index == "meta":
index = settings.INDEX_META
elif index == "internal":
index = settings.INDEX_INT
else:
message = "Index is not valid."
message_class = "danger"
return {
"message": message,
"class": message_class,
}
else:
index = settings.INDEX_MAIN
# Q/T - Query/Tags # Q/T - Query/Tags
# Only one of query or query_full can be active at once search_query = self.parse_query(
# We prefer query because it's simpler query_params, tags, size, custom_query, add_bool
if "query" in query_params: )
query = query_params["query"] # Query should be a dict, so check if it contains message here
search_query = self.construct_query(query, size, tokens=True) if "message" in search_query:
query_created = True return search_query
elif "query_full" in query_params:
query_full = query_params["query_full"]
# if request.user.has_perm("core.query_search"):
search_query = self.construct_query(query_full, size)
query_created = True
# else:
# message = "You cannot search by query string"
# message_class = "danger"
# return {"message": message, "class": message_class}
else:
if custom_query:
search_query = custom_query
if tags:
# Get a blank search query
if not query_created:
search_query = self.construct_query(None, size, use_query_string=False)
query_created = True
for tagname, tagvalue in tags.items():
add_bool.append({tagname: tagvalue})
required_any = ["query_full", "query", "tags"]
if not any([field in query_params.keys() for field in required_any]):
if not custom_query:
message = "Empty query!"
message_class = "warning"
return {"message": message, "class": message_class}
# S - Sources # S - Sources
source = None sources = self.parse_source(request.user, query_params)
if "source" in query_params: if isinstance(sources, dict):
source = query_params["source"] return sources
total_count = len(sources)
if source in settings.SOURCES_RESTRICTED: total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED)
if not request.user.has_perm("core.restricted_sources"): if total_count != total_sources:
message = "Access denied" add_top_tmp = {"bool": {"should": []}}
message_class = "danger" for source_iter in sources:
return {"message": message, "class": message_class} add_top_tmp["bool"]["should"].append(
elif source not in settings.MAIN_SOURCES: {"match_phrase": {"src": source_iter}}
message = "Invalid source" )
message_class = "danger" add_top.append(add_top_tmp)
return {"message": message, "class": message_class}
if source == "all":
source = None # the next block will populate it
if source:
sources = [source]
else:
sources = settings.MAIN_SOURCES
if request.user.has_perm("core.restricted_sources"):
for source_iter in settings.SOURCES_RESTRICTED:
sources.append(source_iter)
add_top_tmp = {"bool": {"should": []}}
for source_iter in sources:
add_top_tmp["bool"]["should"].append({"match_phrase": {"src": source_iter}})
add_top.append(add_top_tmp)
# R - Ranges # R - Ranges
# date_query = False # date_query = False
if set({"from_date", "to_date", "from_time", "to_time"}).issubset( from_ts, to_ts = self.parse_date_time(query_params)
query_params.keys() if from_ts:
):
from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z"
to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z"
range_query = { range_query = {
"range": { "range": {
"ts": { "ts": {
@ -281,70 +270,28 @@ class ElasticsearchBackend(StorageBackend):
} }
add_top.append(range_query) add_top.append(range_query)
# if date_query:
# if settings.DELAY_RESULTS:
# if source not in settings.SAFE_SOURCES:
# if request.user.has_perm("core.bypass_delay"):
# add_top.append(range_query)
# else:
# delay_as_ts = datetime.now() - timedelta(
# days=settings.DELAY_DURATION
# )
# lt_as_ts = datetime.strptime(
# range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ"
# )
# if lt_as_ts > delay_as_ts:
# range_query["range"]["ts"][
# "lt"
# ] = f"now-{settings.DELAY_DURATION}d"
# add_top.append(range_query)
# else:
# add_top.append(range_query)
# else:
# if settings.DELAY_RESULTS:
# if source not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_delay"):
# range_query = {
# "range": {
# "ts": {
# # "gt": ,
# "lt": f"now-{settings.DELAY_DURATION}d",
# }
# }
# }
# add_top.append(range_query)
# S - Sort # S - Sort
if "sorting" in query_params: sort = self.parse_sort(query_params)
sorting = query_params["sorting"] if isinstance(sort, dict):
if sorting not in ("asc", "desc", "none"): return sort
message = "Invalid sort" if sort:
message_class = "danger" # For Druid compatibility
return {"message": message, "class": message_class} sort_map = {"ascending": "asc", "descending": "desc"}
if sorting in ("asc", "desc"): sorting = [
sort = [ {
{ "ts": {
"ts": { "order": sort_map[sort],
"order": sorting,
}
} }
] }
]
search_query["sort"] = sorting
# S - Sentiment # S - Sentiment
if "check_sentiment" in query_params: sentiment_r = self.parse_sentiment(query_params)
if "sentiment_method" not in query_params: if isinstance(sentiment_r, dict):
message = "No sentiment method" return sentiment_r
message_class = "danger" if sentiment_r:
return {"message": message, "class": message_class} sentiment_method, sentiment = sentiment_r
if "sentiment" in query_params:
sentiment = query_params["sentiment"]
try:
sentiment = float(sentiment)
except ValueError:
message = "Sentiment is not a float"
message_class = "danger"
return {"message": message, "class": message_class}
sentiment_method = query_params["sentiment_method"]
range_query_compare = {"range": {"sentiment": {}}} range_query_compare = {"range": {"sentiment": {}}}
range_query_precise = { range_query_precise = {
"match": { "match": {
@ -364,100 +311,27 @@ class ElasticsearchBackend(StorageBackend):
range_query_precise["match"]["sentiment"] = 0 range_query_precise["match"]["sentiment"] = 0
add_top_negative.append(range_query_precise) add_top_negative.append(range_query_precise)
if add_bool: # Add in the additional information we already populated
# if "bool" not in search_query["query"]: self.add_bool(search_query, add_bool)
# search_query["query"]["bool"] = {} self.add_top(search_query, add_top)
# if "must" not in search_query["query"]["bool"]: self.add_top(search_query, add_top_negative, negative=True)
# search_query["query"]["bool"] = {"must": []}
for item in add_bool: response = self.query(
search_query["query"]["bool"]["must"].append({"match_phrase": item}) request.user,
if add_top:
for item in add_top:
search_query["query"]["bool"]["must"].append(item)
if add_top_negative:
for item in add_top_negative:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
else:
search_query["query"]["bool"]["must_not"] = [item]
if sort:
search_query["sort"] = sort
results = self.query(
request.user, # passed through run_main_query to filter_blacklisted
search_query, search_query,
custom_query=True,
index=index, index=index,
size=size,
) )
if not results:
return False
if isinstance(results, Exception):
message = f"Error: {results.info['error']['root_cause'][0]['type']}"
message_class = "danger"
return {"message": message, "class": message_class}
if len(results["hits"]["hits"]) == 0:
message = "No results."
message_class = "danger"
return {"message": message, "class": message_class}
results_parsed = parse_results(results)
# A/D/R - Annotate/Dedup/Reverse # A/D/R - Annotate/Dedup/Reverse
if annotate: self.process_results(
annotate_results(results_parsed) response,
if "dedup" in query_params: annotate=annotate,
if query_params["dedup"] == "on": dedup=dedup,
dedup = True dedup_fields=dedup_fields,
else: reverse=reverse,
dedup = False )
else:
dedup = False
if reverse: context = response
results_parsed = results_parsed[::-1]
if dedup:
if not dedup_fields:
dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"]
results_parsed = helpers.dedup_list(results_parsed, dedup_fields)
# if source not in settings.SAFE_SOURCES:
# if settings.ENCRYPTION:
# encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY)
# if settings.HASHING:
# hash_list(request.user, results_parsed)
# if settings.OBFUSCATION:
# obfuscate_list(request.user, results_parsed)
# if settings.RANDOMISATION:
# randomise_list(request.user, results_parsed)
# process_list(results)
# IMPORTANT! - DO NOT PASS query_params to the user!
context = {
"object_list": results_parsed,
"card": results["hits"]["total"]["value"],
"took": results["took"],
}
if "redacted" in results:
context["redacted"] = results["redacted"]
if "exemption" in results:
context["exemption"] = results["exemption"]
if query:
context["query"] = query
# if settings.DELAY_RESULTS:
# if source not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_delay"):
# context["delay"] = settings.DELAY_DURATION
# if settings.RANDOMISATION:
# if source not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_randomisation"):
# context["randomised"] = True
return context return context
def query_single_result(self, request, query_params): def query_single_result(self, request, query_params):
@ -472,3 +346,28 @@ class ElasticsearchBackend(StorageBackend):
context["item"] = context["object_list"][0] context["item"] = context["object_list"][0]
return context return context
def add_bool(self, search_query, add_bool):
"""
Add the specified boolean matches to search query.
"""
if not add_bool:
return
for item in add_bool:
search_query["query"]["bool"]["must"].append({"match_phrase": item})
def add_top(self, search_query, add_top, negative=False):
"""
Merge add_top with the base of the search_query.
"""
if not add_top:
return
if negative:
for item in add_top:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
else:
search_query["query"]["bool"]["must_not"] = [item]
else:
for item in add_top:
search_query["query"]["bool"]["must"].append(item)

View File

@ -377,6 +377,7 @@ class DrilldownContextModal(APIView):
type=type, type=type,
nicks=nicks_sensitive, nicks=nicks_sensitive,
) )
print("QUERY", search_query)
results = db.query_results( results = db.query_results(
request, request,
query_params, query_params,
@ -389,19 +390,6 @@ class DrilldownContextModal(APIView):
if "message" in results: if "message" in results:
return render(request, self.template_name, results) return render(request, self.template_name, results)
# if settings.HASHING: # we probably want to see the tokens
# if query_params["source"] not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_hashing"):
# for index, item in enumerate(results["object_list"]):
# if "tokens" in item:
# results["object_list"][index]["msg"] = results[
# "object_list"
# ][index].pop("tokens")
# # item["msg"] = item.pop("tokens")
# Make the time nicer
# for index, item in enumerate(results["object_list"]):
# results["object_list"][index]["time"] = item["time"]+"SSS"
unique = str(uuid.uuid4())[:8] unique = str(uuid.uuid4())[:8]
context = { context = {
"net": query_params["net"], "net": query_params["net"],