neptune/core/lib/opensearch.py

488 lines
16 KiB
Python
Raw Normal View History

2022-08-26 06:20:30 +00:00
# from copy import deepcopy
# from datetime import datetime, timedelta
2022-08-18 06:20:30 +00:00
2022-07-21 12:47:02 +00:00
from django.conf import settings
from opensearchpy import OpenSearch
2022-08-11 21:45:02 +00:00
from opensearchpy.exceptions import NotFoundError, RequestError
2022-07-21 12:47:02 +00:00
# from json import dumps
# pp = lambda x: print(dumps(x, indent=2))
2022-09-06 10:53:32 +00:00
from core.lib.processing import annotate_results, filter_blacklisted, parse_results
from core.views.helpers import dedup_list
2022-07-21 12:51:55 +00:00
2022-07-21 12:47:02 +00:00
def initialise_opensearch():
2022-07-21 12:51:55 +00:00
"""
Inititialise the OpenSearch API endpoint.
"""
2022-07-21 12:47:02 +00:00
auth = (settings.OPENSEARCH_USERNAME, settings.OPENSEARCH_PASSWORD)
client = OpenSearch(
2022-07-21 12:47:10 +00:00
# fmt: off
2022-07-21 12:47:02 +00:00
hosts=[{"host": settings.OPENSEARCH_URL,
2022-07-21 12:47:10 +00:00
"port": settings.OPENSEARCH_PORT}],
2022-07-21 12:47:02 +00:00
http_compress=False, # enables gzip compression for request bodies
http_auth=auth,
# client_cert = client_cert_path,
# client_key = client_key_path,
use_ssl=settings.OPENSEARCH_TLS,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
# a_certs=ca_certs_path,
)
return client
2022-07-21 12:51:55 +00:00
client = initialise_opensearch()
def construct_query(query, size, use_query_string=True, tokens=False):
2022-08-09 06:20:30 +00:00
"""
Accept some query parameters and construct an OpenSearch query.
"""
if not size:
size = 5
query_base = {
2022-08-09 06:20:30 +00:00
"size": size,
"query": {"bool": {"must": []}},
}
query_string = {
"query_string": {
"query": query,
# "fields": fields,
# "default_field": "msg",
# "type": "best_fields",
"fuzziness": "AUTO",
"fuzzy_transpositions": True,
"fuzzy_max_expansions": 50,
"fuzzy_prefix_length": 0,
# "minimum_should_match": 1,
"default_operator": "or",
"analyzer": "standard",
"lenient": True,
"boost": 1,
"allow_leading_wildcard": True,
# "enable_position_increments": False,
"phrase_slop": 3,
# "max_determinized_states": 10000,
"quote_field_suffix": "",
"quote_analyzer": "standard",
"analyze_wildcard": False,
"auto_generate_synonyms_phrase_query": True,
}
}
query_tokens = {
"simple_query_string": {
# "tokens": query,
"query": query,
"fields": ["tokens"],
"flags": "ALL",
"fuzzy_transpositions": True,
"fuzzy_max_expansions": 50,
"fuzzy_prefix_length": 0,
"default_operator": "and",
"analyzer": "standard",
"lenient": True,
"boost": 1,
"quote_field_suffix": "",
"analyze_wildcard": False,
"auto_generate_synonyms_phrase_query": False,
}
2022-08-09 06:20:30 +00:00
}
if tokens:
query_base["query"]["bool"]["must"].append(query_tokens)
# query["query"]["bool"]["must"].append(query_string)
# query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"]
elif use_query_string:
query_base["query"]["bool"]["must"].append(query_string)
return query_base
2022-08-09 06:20:30 +00:00
2022-08-26 06:20:30 +00:00
def run_main_query(client, user, query, custom_query=False, index=None, size=None):
2022-07-21 12:51:55 +00:00
"""
Low level helper to run an ES query.
Accept a user to pass it to the filter, so we can
avoid filtering for superusers.
Accept fields and size, for the fields we want to match and the
number of results to return.
"""
2022-07-21 12:52:41 +00:00
if not index:
index = settings.OPENSEARCH_INDEX_MAIN
if custom_query:
search_query = query
else:
search_query = construct_query(query, size)
2022-07-21 12:51:55 +00:00
try:
2022-07-21 12:52:41 +00:00
response = client.search(body=search_query, index=index)
except RequestError as err:
print("OpenSearch error", err)
2022-08-09 10:54:44 +00:00
return err
2022-08-11 21:45:02 +00:00
except NotFoundError as err:
print("OpenSearch error", err)
return err
2022-08-26 06:20:30 +00:00
filter_blacklisted(user, response)
2022-07-21 12:51:55 +00:00
return response
2022-08-09 06:20:30 +00:00
def query_results(
request,
query_params,
size=None,
annotate=True,
custom_query=False,
reverse=False,
dedup=False,
dedup_fields=None,
2022-08-18 06:20:30 +00:00
lookup_hashes=True,
tags=None,
):
2022-07-21 12:51:55 +00:00
"""
API helper to alter the OpenSearch return format into something
a bit better to parse.
Accept a HTTP request object. Run the query, and annotate the
results with the other data we have.
"""
# is_anonymous = isinstance(request.user, AnonymousUser)
2022-08-09 06:20:30 +00:00
query = None
message = None
message_class = None
add_bool = []
2022-08-03 06:20:30 +00:00
add_top = []
2022-08-03 06:20:30 +00:00
add_top_negative = []
2022-08-03 06:20:30 +00:00
sort = None
query_created = False
2022-08-18 06:20:30 +00:00
# Lookup the hash values but don't disclose them to the user
2022-08-26 06:20:30 +00:00
# denied = []
# if lookup_hashes:
# if settings.HASHING:
# query_params = deepcopy(query_params)
# denied_q = hash_lookup(request.user, query_params)
# denied.extend(denied_q)
# if tags:
# denied_t = hash_lookup(request.user, tags, query_params)
# denied.extend(denied_t)
# message = "Permission denied: "
# for x in denied:
# if isinstance(x, SearchDenied):
# message += f"Search({x.key}: {x.value}) "
# elif isinstance(x, LookupDenied):
# message += f"Lookup({x.key}: {x.value}) "
# if denied:
# # message = [f"{i}" for i in message]
# # message = "\n".join(message)
# message_class = "danger"
# return {"message": message, "class": message_class}
2022-08-18 06:20:30 +00:00
if request.user.is_anonymous:
sizes = settings.OPENSEARCH_MAIN_SIZES_ANON
else:
sizes = settings.OPENSEARCH_MAIN_SIZES
2022-07-21 12:51:55 +00:00
if not size:
if "size" in query_params:
size = query_params["size"]
if size not in sizes:
message = "Size is not permitted"
message_class = "danger"
return {"message": message, "class": message_class}
2022-08-09 06:20:30 +00:00
else:
size = 20
2022-08-30 09:30:17 +00:00
source = None
if "source" in query_params:
source = query_params["source"]
2022-08-26 06:20:30 +00:00
if source in settings.OPENSEARCH_SOURCES_RESTRICTED:
2022-08-26 06:20:30 +00:00
if not request.user.has_perm("core.restricted_sources"):
2022-08-26 06:20:30 +00:00
message = "Access denied"
message_class = "danger"
return {"message": message, "class": message_class}
elif source not in settings.OPENSEARCH_MAIN_SOURCES:
message = "Invalid source"
message_class = "danger"
return {"message": message, "class": message_class}
2022-08-27 11:53:37 +00:00
2022-08-26 06:20:30 +00:00
if source == "all":
source = None # the next block will populate it
if source:
sources = [source]
else:
sources = settings.OPENSEARCH_MAIN_SOURCES
2022-08-26 06:20:30 +00:00
if request.user.has_perm("core.restricted_sources"):
2022-08-26 06:20:30 +00:00
for source_iter in settings.OPENSEARCH_SOURCES_RESTRICTED:
sources.append(source_iter)
add_top_tmp = {"bool": {"should": []}}
for source_iter in sources:
add_top_tmp["bool"]["should"].append({"match_phrase": {"src": source_iter}})
add_top.append(add_top_tmp)
# date_query = False
if set({"from_date", "to_date", "from_time", "to_time"}).issubset(
query_params.keys()
):
from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z"
to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z"
range_query = {
"range": {
"ts": {
"gt": from_ts,
"lt": to_ts,
2022-08-03 06:20:30 +00:00
}
}
}
2022-08-26 06:20:30 +00:00
add_top.append(range_query)
# if date_query:
# if settings.DELAY_RESULTS:
# if source not in settings.SAFE_SOURCES:
# if request.user.has_perm("core.bypass_delay"):
# add_top.append(range_query)
# else:
# delay_as_ts = datetime.now() - timedelta(
# days=settings.DELAY_DURATION
# )
# lt_as_ts = datetime.strptime(
# range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ"
# )
# if lt_as_ts > delay_as_ts:
# range_query["range"]["ts"][
# "lt"
# ] = f"now-{settings.DELAY_DURATION}d"
# add_top.append(range_query)
# else:
# add_top.append(range_query)
# else:
# if settings.DELAY_RESULTS:
# if source not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_delay"):
# range_query = {
# "range": {
# "ts": {
# # "gt": ,
# "lt": f"now-{settings.DELAY_DURATION}d",
# }
# }
# }
# add_top.append(range_query)
2022-08-27 16:31:39 +00:00
if "sorting" in query_params:
sorting = query_params["sorting"]
2022-08-03 06:20:30 +00:00
if sorting not in ("asc", "desc", "none"):
message = "Invalid sort"
message_class = "danger"
return {"message": message, "class": message_class}
if sorting in ("asc", "desc"):
sort = [
{
"ts": {
"order": sorting,
}
}
]
if "check_sentiment" in query_params:
if "sentiment_method" not in query_params:
2022-08-03 06:20:30 +00:00
message = "No sentiment method"
message_class = "danger"
return {"message": message, "class": message_class}
if "sentiment" in query_params:
sentiment = query_params["sentiment"]
try:
sentiment = float(sentiment)
except ValueError:
message = "Sentiment is not a float"
message_class = "danger"
return {"message": message, "class": message_class}
sentiment_method = query_params["sentiment_method"]
range_query_compare = {"range": {"sentiment": {}}}
2022-08-03 06:20:30 +00:00
range_query_precise = {
"match": {
"sentiment": None,
}
}
if sentiment_method == "below":
range_query_compare["range"]["sentiment"]["lt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "above":
range_query_compare["range"]["sentiment"]["gt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "exact":
range_query_precise["match"]["sentiment"] = sentiment
add_top.append(range_query_precise)
elif sentiment_method == "nonzero":
range_query_precise["match"]["sentiment"] = 0
add_top_negative.append(range_query_precise)
# Only one of query or query_full can be active at once
# We prefer query because it's simpler
if "query" in query_params:
query = query_params["query"]
search_query = construct_query(query, size, tokens=True)
query_created = True
elif "query_full" in query_params:
query_full = query_params["query_full"]
2022-08-26 06:20:30 +00:00
# if request.user.has_perm("core.query_search"):
search_query = construct_query(query_full, size)
query_created = True
# else:
# message = "You cannot search by query string"
# message_class = "danger"
# return {"message": message, "class": message_class}
2022-08-09 06:20:30 +00:00
else:
if custom_query:
search_query = custom_query
if tags:
# Get a blank search query
if not query_created:
search_query = construct_query(None, size, use_query_string=False)
query_created = True
for tagname, tagvalue in tags.items():
add_bool.append({tagname: tagvalue})
required_any = ["query_full", "query", "tags"]
if not any([field in query_params.keys() for field in required_any]):
if not custom_query:
message = "Empty query!"
message_class = "warning"
return {"message": message, "class": message_class}
2022-08-09 06:20:30 +00:00
if add_bool:
# if "bool" not in search_query["query"]:
# search_query["query"]["bool"] = {}
# if "must" not in search_query["query"]["bool"]:
# search_query["query"]["bool"] = {"must": []}
2022-08-09 06:20:30 +00:00
for item in add_bool:
search_query["query"]["bool"]["must"].append({"match_phrase": item})
2022-08-09 06:20:30 +00:00
if add_top:
for item in add_top:
search_query["query"]["bool"]["must"].append(item)
if add_top_negative:
for item in add_top_negative:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
2022-08-11 21:45:02 +00:00
else:
2022-08-09 06:20:30 +00:00
search_query["query"]["bool"]["must_not"] = [item]
if sort:
search_query["sort"] = sort
if "index" in query_params:
index = query_params["index"]
if index == "main":
index = settings.OPENSEARCH_INDEX_MAIN
else:
2022-08-27 16:31:39 +00:00
if not request.user.has_perm(f"core.index_{index}"):
2022-08-09 06:20:30 +00:00
message = "Not permitted to search by this index"
message_class = "danger"
return {
"message": message,
"class": message_class,
}
if index == "meta":
index = settings.OPENSEARCH_INDEX_META
2022-09-05 06:20:30 +00:00
elif index == "internal":
index = settings.OPENSEARCH_INDEX_INT
else:
message = "Index is not valid."
message_class = "danger"
return {
"message": message,
"class": message_class,
}
2022-08-09 06:20:30 +00:00
else:
index = settings.OPENSEARCH_INDEX_MAIN
2022-08-18 06:20:30 +00:00
2022-08-09 06:20:30 +00:00
results = run_main_query(
client,
request.user, # passed through run_main_query to filter_blacklisted
search_query,
custom_query=True,
index=index,
size=size,
)
if not results:
return False
if isinstance(results, Exception):
message = f"Error: {results.info['error']['root_cause'][0]['type']}"
2022-08-09 06:20:30 +00:00
message_class = "danger"
return {"message": message, "class": message_class}
if len(results["hits"]["hits"]) == 0:
message = "No results."
message_class = "danger"
return {"message": message, "class": message_class}
2022-07-21 12:51:55 +00:00
2022-08-09 06:20:30 +00:00
results_parsed = parse_results(results)
2022-08-09 06:20:30 +00:00
if annotate:
2022-07-21 12:51:55 +00:00
annotate_results(results_parsed)
if "dedup" in query_params:
if query_params["dedup"] == "on":
dedup = True
else:
dedup = False
else:
dedup = False
if reverse:
results_parsed = results_parsed[::-1]
if dedup:
if not dedup_fields:
dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"]
results_parsed = dedup_list(results_parsed, dedup_fields)
2022-07-21 12:51:55 +00:00
2022-08-26 06:20:30 +00:00
# if source not in settings.SAFE_SOURCES:
# if settings.ENCRYPTION:
# encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY)
2022-08-18 06:20:30 +00:00
2022-08-26 06:20:30 +00:00
# if settings.HASHING:
# hash_list(request.user, results_parsed)
2022-08-26 19:44:39 +00:00
2022-08-26 06:20:30 +00:00
# if settings.OBFUSCATION:
# obfuscate_list(request.user, results_parsed)
2022-08-26 19:44:39 +00:00
2022-08-26 06:20:30 +00:00
# if settings.RANDOMISATION:
# randomise_list(request.user, results_parsed)
2022-08-27 12:18:24 +00:00
2022-08-30 09:48:21 +00:00
# process_list(results)
2022-08-18 06:20:30 +00:00
# IMPORTANT! - DO NOT PASS query_params to the user!
2022-08-09 06:20:30 +00:00
context = {
"object_list": results_parsed,
"card": results["hits"]["total"]["value"],
"took": results["took"],
}
2022-08-30 09:52:43 +00:00
if "redacted" in results:
context["redacted"] = results["redacted"]
if "exemption" in results:
context["exemption"] = results["exemption"]
2022-08-09 06:20:30 +00:00
if query:
context["query"] = query
2022-08-26 06:20:30 +00:00
# if settings.DELAY_RESULTS:
# if source not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_delay"):
# context["delay"] = settings.DELAY_DURATION
# if settings.RANDOMISATION:
# if source not in settings.SAFE_SOURCES:
# if not request.user.has_perm("core.bypass_randomisation"):
# context["randomised"] = True
2022-08-09 06:20:30 +00:00
return context
2022-07-21 12:51:55 +00:00
2022-08-26 20:03:21 +00:00
def query_single_result(request, query_params):
context = query_results(request, query_params, size=100)
2022-08-11 22:09:53 +00:00
if not context:
return {"message": "Failed to run query", "message_class": "danger"}
if "message" in context:
return context
dedup_set = {item["nick"] for item in context["object_list"]}
if dedup_set:
2022-08-11 22:09:53 +00:00
context["item"] = context["object_list"][0]
2022-08-11 22:09:53 +00:00
return context