neptune/core/lib/opensearch.py

433 lines
16 KiB
Python

from django.conf import settings
from opensearchpy import OpenSearch
from opensearchpy.exceptions import NotFoundError, RequestError
from core.lib.threshold import annotate_num_chans, annotate_num_users, annotate_online
def initialise_opensearch():
"""
Inititialise the OpenSearch API endpoint.
"""
auth = (settings.OPENSEARCH_USERNAME, settings.OPENSEARCH_PASSWORD)
client = OpenSearch(
# fmt: off
hosts=[{"host": settings.OPENSEARCH_URL,
"port": settings.OPENSEARCH_PORT}],
http_compress=False, # enables gzip compression for request bodies
http_auth=auth,
# client_cert = client_cert_path,
# client_key = client_key_path,
use_ssl=settings.OPENSEARCH_TLS,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
# a_certs=ca_certs_path,
)
return client
client = initialise_opensearch()
def annotate_results(results_parsed):
"""
Accept a list of dict objects, search for the number of channels and users.
Add them to the object.
Mutate it in place. Does not return anything.
"""
# Figure out items with net (not discord)
nets = set()
for x in results_parsed:
if "net" in x:
nets.add(x["net"])
for net in nets:
# Annotate the online attribute from Threshold
nicks = list(
set(
[
x["nick"]
for x in results_parsed
if x["src"] == "irc" and x["net"] == net and "nick" in x
]
)
)
channels = list(
set(
[
x["channel"]
for x in results_parsed
if x["src"] == "irc" and x["net"] == net and "channel" in x
]
)
)
online_info = annotate_online(net, nicks)
# Annotate the number of users in the channel
num_users = annotate_num_users(net, channels)
# Annotate the number channels the user is on
num_chans = annotate_num_chans(net, nicks)
for item in results_parsed:
if "net" in item:
if item["net"] == net:
if "nick" in item:
if item["nick"] in online_info:
item["online"] = online_info[item["nick"]]
if "channel" in item:
if item["channel"] in num_users:
item["num_users"] = num_users[item["channel"]]
if "nick" in item:
if item["nick"] in num_chans:
item["num_chans"] = num_chans[item["nick"]]
def filter_blacklisted(user, response):
"""
Low level filter to take the raw OpenSearch response and remove
objects from it we want to keep secret.
Does not return, the object is mutated in place.
"""
response["redacted"] = 0
response["exemption"] = None
if user.is_superuser:
response["exemption"] = True
# is_anonymous = isinstance(user, AnonymousUser)
# For every hit from ES
for index, item in enumerate(list(response["hits"]["hits"])):
# For every blacklisted type
for blacklisted_type in settings.OPENSEARCH_BLACKLISTED.keys():
# Check this field we are matching exists
if "_source" in item.keys():
data_index = "_source"
elif "fields" in item.keys():
data_index = "fields"
else:
return False
if blacklisted_type in item[data_index].keys():
content = item[data_index][blacklisted_type]
# For every item in the blacklisted array for the type
for blacklisted_item in settings.OPENSEARCH_BLACKLISTED[
blacklisted_type
]:
if blacklisted_item == str(content):
# Remove the item
if item in response["hits"]["hits"]:
# Let the UI know something was redacted
if (
"exemption"
not in response["hits"]["hits"][index][data_index]
):
response["redacted"] += 1
# Anonymous
if user.is_anonymous:
# Just set it to none so the index is not off
response["hits"]["hits"][index] = None
else:
if not user.is_superuser:
response["hits"]["hits"][index] = None
else:
response["hits"]["hits"][index][data_index][
"exemption"
] = True
# Actually get rid of all the things we set to None
response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit]
def construct_query(query, size):
"""
Accept some query parameters and construct an OpenSearch query.
"""
if not size:
size = 5
query = {
"size": size,
"query": {
"bool": {
"must": [
{
"query_string": {
"query": query,
# "fields": fields,
# "default_field": "msg",
# "type": "best_fields",
"fuzziness": "AUTO",
"fuzzy_transpositions": True,
"fuzzy_max_expansions": 50,
"fuzzy_prefix_length": 0,
# "minimum_should_match": 1,
"default_operator": "or",
"analyzer": "standard",
"lenient": True,
"boost": 1,
"allow_leading_wildcard": True,
# "enable_position_increments": False,
"phrase_slop": 3,
# "max_determinized_states": 10000,
"quote_field_suffix": "",
"quote_analyzer": "standard",
"analyze_wildcard": False,
"auto_generate_synonyms_phrase_query": True,
}
}
]
}
},
}
return query
def run_main_query(client, user, query, custom_query=False, index=None, size=None):
"""
Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can
avoid filtering for superusers.
Accept fields and size, for the fields we want to match and the
number of results to return.
"""
if not index:
index = settings.OPENSEARCH_INDEX_MAIN
if custom_query:
search_query = query
else:
search_query = construct_query(query, size)
try:
response = client.search(body=search_query, index=index)
except RequestError as err:
print("OpenSearch error", err)
return err
except NotFoundError as err:
print("OpenSearch error", err)
return err
filter_blacklisted(user, response)
return response
def parse_results(results):
results_parsed = []
if "hits" in results.keys():
if "hits" in results["hits"]:
for item in results["hits"]["hits"]:
if "_source" in item.keys():
data_index = "_source"
elif "fields" in item.keys():
data_index = "fields"
else:
return False
element = item[data_index]
# Why are fields in lists...
if data_index == "fields":
element = {k: v[0] for k, v in element.items() if len(v)}
element["id"] = item["_id"]
# Split the timestamp into date and time
if "ts" not in element:
if "time" in element: # will fix data later
ts = element["time"]
del element["time"]
element["ts"] = ts
if "ts" in element:
ts = element["ts"]
ts_spl = ts.split("T")
date = ts_spl[0]
time = ts_spl[1]
element["date"] = date
if "." in time:
time_spl = time.split(".")
if len(time_spl) == 2:
element["time"] = time.split(".")[0]
else:
element["time"] = time
else:
element["time"] = time
results_parsed.append(element)
return results_parsed
def query_results(request, query_params, size=None, annotate=True, custom_query=False):
"""
API helper to alter the OpenSearch return format into something
a bit better to parse.
Accept a HTTP request object. Run the query, and annotate the
results with the other data we have.
"""
# is_anonymous = isinstance(request.user, AnonymousUser)
query = None
message = None
message_class = None
add_bool = []
add_top = []
add_top_negative = []
sort = None
if request.user.is_anonymous:
sizes = settings.OPENSEARCH_MAIN_SIZES_ANON
else:
sizes = settings.OPENSEARCH_MAIN_SIZES
if not size:
if "size" in query_params:
size = query_params["size"]
if size not in sizes:
message = "Size is not permitted"
message_class = "danger"
return {"message": message, "class": message_class}
else:
size = 20
if "source" in query_params:
source = query_params["source"]
if source not in settings.OPENSEARCH_MAIN_SOURCES:
message = "Invalid source"
message_class = "danger"
return {"message": message, "class": message_class}
if source != "all":
add_bool.append({"src": source})
if set({"from_date", "to_date", "from_time", "to_time"}).issubset(
query_params.keys()
):
from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z"
to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z"
range_query = {
"range": {
"ts": {
"gt": from_ts,
"lt": to_ts,
}
}
}
add_top.append(range_query)
if "sorting" in query_params:
sorting = query_params["sorting"]
if sorting not in ("asc", "desc", "none"):
message = "Invalid sort"
message_class = "danger"
return {"message": message, "class": message_class}
if sorting in ("asc", "desc"):
sort = [
{
"ts": {
"order": sorting,
}
}
]
if "check_sentiment" in query_params:
if "sentiment_method" not in query_params:
message = "No sentiment method"
message_class = "danger"
return {"message": message, "class": message_class}
if "sentiment" in query_params:
sentiment = query_params["sentiment"]
try:
sentiment = float(sentiment)
except ValueError:
message = "Sentiment is not a float"
message_class = "danger"
return {"message": message, "class": message_class}
sentiment_method = query_params["sentiment_method"]
range_query_compare = {"range": {"sentiment": {}}}
range_query_precise = {
"match": {
"sentiment": None,
}
}
if sentiment_method == "below":
range_query_compare["range"]["sentiment"]["lt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "above":
range_query_compare["range"]["sentiment"]["gt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "exact":
range_query_precise["match"]["sentiment"] = sentiment
add_top.append(range_query_precise)
elif sentiment_method == "nonzero":
range_query_precise["match"]["sentiment"] = 0
add_top_negative.append(range_query_precise)
if "query" in query_params:
query = query_params["query"]
search_query = construct_query(query, size)
else:
if custom_query:
search_query = custom_query
if add_bool:
for item in add_bool:
search_query["query"]["bool"]["must"].append({"match": item})
if add_top:
for item in add_top:
search_query["query"]["bool"]["must"].append(item)
if add_top_negative:
for item in add_top_negative:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
else:
search_query["query"]["bool"]["must_not"] = [item]
if sort:
search_query["sort"] = sort
if "index" in query_params:
index = query_params["index"]
if index == "main":
index = settings.OPENSEARCH_INDEX_MAIN
else:
if request.user.is_superuser:
if index == "meta":
index = settings.OPENSEARCH_INDEX_META
elif index == "int":
index = settings.OPENSEARCH_INDEX_INT
else:
message = "Index is not valid."
message_class = "danger"
return {"message": message, "class": message_class}
else:
message = "Not permitted to search by this index"
message_class = "danger"
return {"message": message, "class": message_class}
else:
index = settings.OPENSEARCH_INDEX_MAIN
results = run_main_query(
client,
request.user, # passed through run_main_query to filter_blacklisted
search_query,
custom_query=True,
index=index,
size=size,
)
if not results:
return False
if isinstance(results, Exception):
message = results.info["error"]["root_cause"][0]["reason"]
message_class = "danger"
return {"message": message, "class": message_class}
if len(results["hits"]["hits"]) == 0:
message = "No results."
message_class = "danger"
return {"message": message, "class": message_class}
results_parsed = parse_results(results)
if annotate:
annotate_results(results_parsed)
context = {
"object_list": results_parsed,
"card": results["hits"]["total"]["value"],
"took": results["took"],
"redacted": results["redacted"],
"exemption": results["exemption"],
}
if query:
context["query"] = query
return context
def query_single_result(request):
context = query_results(request, request.POST.dict(), size=100)
if not context:
return {"message": "Failed to run query", "message_class": "danger"}
if "message" in context:
return context
dedup_set = {item["nick"] for item in context["object_list"]}
if dedup_set:
context["item"] = context["object_list"][0]
return context