from copy import deepcopy from datetime import datetime, timedelta from django.conf import settings from opensearchpy import OpenSearch from opensearchpy.exceptions import NotFoundError, RequestError from core.lib.threshold import annotate_num_chans, annotate_num_users, annotate_online from core.views.helpers import ( LookupDenied, SearchDenied, dedup_list, encrypt_list, hash_list, hash_lookup, obfuscate_list, randomise_list, ) # from json import dumps # pp = lambda x: print(dumps(x, indent=2)) def initialise_opensearch(): """ Inititialise the OpenSearch API endpoint. """ auth = (settings.OPENSEARCH_USERNAME, settings.OPENSEARCH_PASSWORD) client = OpenSearch( # fmt: off hosts=[{"host": settings.OPENSEARCH_URL, "port": settings.OPENSEARCH_PORT}], http_compress=False, # enables gzip compression for request bodies http_auth=auth, # client_cert = client_cert_path, # client_key = client_key_path, use_ssl=settings.OPENSEARCH_TLS, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, # a_certs=ca_certs_path, ) return client client = initialise_opensearch() def annotate_results(results_parsed): """ Accept a list of dict objects, search for the number of channels and users. Add them to the object. Mutate it in place. Does not return anything. """ # Figure out items with net (not discord) nets = set() for x in results_parsed: if "net" in x: nets.add(x["net"]) for net in nets: # Annotate the online attribute from Threshold nicks = list( set( [ x["nick"] for x in results_parsed if {"nick", "src", "net"}.issubset(x) and x["src"] == "irc" and x["net"] == net ] ) ) channels = list( set( [ x["channel"] for x in results_parsed if {"channel", "src", "net"}.issubset(x) and x["src"] == "irc" and x["net"] == net ] ) ) online_info = annotate_online(net, nicks) # Annotate the number of users in the channel num_users = annotate_num_users(net, channels) # Annotate the number channels the user is on num_chans = annotate_num_chans(net, nicks) for item in results_parsed: if "net" in item: if item["net"] == net: if "nick" in item: if item["nick"] in online_info: item["online"] = online_info[item["nick"]] if "channel" in item: if item["channel"] in num_users: item["num_users"] = num_users[item["channel"]] if "nick" in item: if item["nick"] in num_chans: item["num_chans"] = num_chans[item["nick"]] def filter_blacklisted(user, response): """ Low level filter to take the raw OpenSearch response and remove objects from it we want to keep secret. Does not return, the object is mutated in place. """ response["redacted"] = 0 response["exemption"] = None if user.is_superuser: response["exemption"] = True # is_anonymous = isinstance(user, AnonymousUser) # For every hit from ES for index, item in enumerate(list(response["hits"]["hits"])): # For every blacklisted type for blacklisted_type in settings.OPENSEARCH_BLACKLISTED.keys(): # Check this field we are matching exists if "_source" in item.keys(): data_index = "_source" elif "fields" in item.keys(): data_index = "fields" else: return False if blacklisted_type in item[data_index].keys(): content = item[data_index][blacklisted_type] # For every item in the blacklisted array for the type for blacklisted_item in settings.OPENSEARCH_BLACKLISTED[ blacklisted_type ]: if blacklisted_item == str(content): # Remove the item if item in response["hits"]["hits"]: # Let the UI know something was redacted if ( "exemption" not in response["hits"]["hits"][index][data_index] ): response["redacted"] += 1 # Anonymous if user.is_anonymous: # Just set it to none so the index is not off response["hits"]["hits"][index] = None else: if not user.has_perm("core.bypass_blacklist"): response["hits"]["hits"][index] = None else: response["hits"]["hits"][index][data_index][ "exemption" ] = True # Actually get rid of all the things we set to None response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit] def construct_query(query, size, use_query_string=True, tokens=False): """ Accept some query parameters and construct an OpenSearch query. """ if not size: size = 5 query_base = { "size": size, "query": {"bool": {"must": []}}, } query_string = { "query_string": { "query": query, # "fields": fields, # "default_field": "msg", # "type": "best_fields", "fuzziness": "AUTO", "fuzzy_transpositions": True, "fuzzy_max_expansions": 50, "fuzzy_prefix_length": 0, # "minimum_should_match": 1, "default_operator": "or", "analyzer": "standard", "lenient": True, "boost": 1, "allow_leading_wildcard": True, # "enable_position_increments": False, "phrase_slop": 3, # "max_determinized_states": 10000, "quote_field_suffix": "", "quote_analyzer": "standard", "analyze_wildcard": False, "auto_generate_synonyms_phrase_query": True, } } query_tokens = { "simple_query_string": { # "tokens": query, "query": query, "fields": ["tokens"], "flags": "ALL", "fuzzy_transpositions": True, "fuzzy_max_expansions": 50, "fuzzy_prefix_length": 0, "default_operator": "and", "analyzer": "standard", "lenient": True, "boost": 1, "quote_field_suffix": "", "analyze_wildcard": False, "auto_generate_synonyms_phrase_query": False, } } if tokens: query_base["query"]["bool"]["must"].append(query_tokens) # query["query"]["bool"]["must"].append(query_string) # query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"] elif use_query_string: query_base["query"]["bool"]["must"].append(query_string) return query_base def run_main_query(client, user, query, custom_query=False, index=None, size=None, filter=True): """ Low level helper to run an ES query. Accept a user to pass it to the filter, so we can avoid filtering for superusers. Accept fields and size, for the fields we want to match and the number of results to return. """ if not index: index = settings.OPENSEARCH_INDEX_MAIN if custom_query: search_query = query else: search_query = construct_query(query, size) try: response = client.search(body=search_query, index=index) except RequestError as err: print("OpenSearch error", err) return err except NotFoundError as err: print("OpenSearch error", err) return err if filter: filter_blacklisted(user, response) return response def parse_results(results): results_parsed = [] stringify = ["host", "channel"] if "hits" in results.keys(): if "hits" in results["hits"]: for item in results["hits"]["hits"]: if "_source" in item.keys(): data_index = "_source" elif "fields" in item.keys(): data_index = "fields" else: return False element = item[data_index] for field in stringify: if field in element: element[field] = str(element[field]) # Why are fields in lists... if data_index == "fields": element = {k: v[0] for k, v in element.items() if len(v)} element["id"] = item["_id"] # Split the timestamp into date and time if "ts" not in element: if "time" in element: # will fix data later ts = element["time"] del element["time"] element["ts"] = ts if "ts" in element: ts = element["ts"] ts_spl = ts.split("T") date = ts_spl[0] time = ts_spl[1] element["date"] = date if "." in time: time_spl = time.split(".") if len(time_spl) == 2: element["time"] = time.split(".")[0] else: element["time"] = time else: element["time"] = time results_parsed.append(element) return results_parsed def query_results( request, query_params, size=None, annotate=True, custom_query=False, reverse=False, dedup=False, dedup_fields=None, lookup_hashes=True, tags=None, ): """ API helper to alter the OpenSearch return format into something a bit better to parse. Accept a HTTP request object. Run the query, and annotate the results with the other data we have. """ # is_anonymous = isinstance(request.user, AnonymousUser) query = None message = None message_class = None add_bool = [] add_top = [] add_top_negative = [] sort = None query_created = False # Lookup the hash values but don't disclose them to the user denied = [] if lookup_hashes: if settings.HASHING: query_params = deepcopy(query_params) denied_q = hash_lookup(request.user, query_params) denied.extend(denied_q) if tags: denied_t = hash_lookup(request.user, tags, query_params) denied.extend(denied_t) message = "Permission denied: " for x in denied: if isinstance(x, SearchDenied): message += f"Search({x.key}: {x.value}) " elif isinstance(x, LookupDenied): message += f"Lookup({x.key}: {x.value}) " if denied: # message = [f"{i}" for i in message] # message = "\n".join(message) message_class = "danger" return {"message": message, "class": message_class} if request.user.is_anonymous: sizes = settings.OPENSEARCH_MAIN_SIZES_ANON else: sizes = settings.OPENSEARCH_MAIN_SIZES if not size: if "size" in query_params: size = query_params["size"] if size not in sizes: message = "Size is not permitted" message_class = "danger" return {"message": message, "class": message_class} else: size = 20 source = None if "source" in query_params: source = query_params["source"] if source not in settings.OPENSEARCH_MAIN_SOURCES: message = "Invalid source" message_class = "danger" return {"message": message, "class": message_class} if source != "all": add_bool.append({"src": source}) date_query = False if set({"from_date", "to_date", "from_time", "to_time"}).issubset( query_params.keys() ): from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" range_query = { "range": { "ts": { "gt": from_ts, "lt": to_ts, } } } date_query = True if date_query: if settings.DELAY_RESULTS: if source not in settings.SAFE_SOURCES: if request.user.has_perm("core.bypass_delay"): add_top.append(range_query) else: delay_as_ts = datetime.now() - timedelta( days=settings.DELAY_DURATION ) lt_as_ts = datetime.strptime( range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ" ) if lt_as_ts > delay_as_ts: range_query["range"]["ts"][ "lt" ] = f"now-{settings.DELAY_DURATION}d" add_top.append(range_query) else: add_top.append(range_query) else: if settings.DELAY_RESULTS: if source not in settings.SAFE_SOURCES: if not request.user.has_perm("core.bypass_delay"): range_query = { "range": { "ts": { # "gt": , "lt": f"now-{settings.DELAY_DURATION}d", } } } add_top.append(range_query) if "sorting" in query_params: sorting = query_params["sorting"] if sorting not in ("asc", "desc", "none"): message = "Invalid sort" message_class = "danger" return {"message": message, "class": message_class} if sorting in ("asc", "desc"): sort = [ { "ts": { "order": sorting, } } ] if "check_sentiment" in query_params: if "sentiment_method" not in query_params: message = "No sentiment method" message_class = "danger" return {"message": message, "class": message_class} if "sentiment" in query_params: sentiment = query_params["sentiment"] try: sentiment = float(sentiment) except ValueError: message = "Sentiment is not a float" message_class = "danger" return {"message": message, "class": message_class} sentiment_method = query_params["sentiment_method"] range_query_compare = {"range": {"sentiment": {}}} range_query_precise = { "match": { "sentiment": None, } } if sentiment_method == "below": range_query_compare["range"]["sentiment"]["lt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "above": range_query_compare["range"]["sentiment"]["gt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "exact": range_query_precise["match"]["sentiment"] = sentiment add_top.append(range_query_precise) elif sentiment_method == "nonzero": range_query_precise["match"]["sentiment"] = 0 add_top_negative.append(range_query_precise) # Only one of query or query_full can be active at once # We prefer query because it's simpler if "query" in query_params: query = query_params["query"] search_query = construct_query(query, size, tokens=True) query_created = True elif "query_full" in query_params: query_full = query_params["query_full"] if request.user.has_perm("core.query_search"): search_query = construct_query(query_full, size) query_created = True else: message = "You cannot search by query string" message_class = "danger" return {"message": message, "class": message_class} else: if custom_query: search_query = custom_query if tags: # Get a blank search query if not query_created: search_query = construct_query(None, size, use_query_string=False) query_created = True for tagname, tagvalue in tags.items(): add_bool.append({tagname: tagvalue}) required_any = ["query_full", "query", "tags"] if not any([field in query_params.keys() for field in required_any]): if not custom_query: message = "Empty query!" message_class = "warning" return {"message": message, "class": message_class} if add_bool: # if "bool" not in search_query["query"]: # search_query["query"]["bool"] = {} # if "must" not in search_query["query"]["bool"]: # search_query["query"]["bool"] = {"must": []} for item in add_bool: search_query["query"]["bool"]["must"].append({"match_phrase": item}) if add_top: for item in add_top: search_query["query"]["bool"]["must"].append(item) if add_top_negative: for item in add_top_negative: if "must_not" in search_query["query"]["bool"]: search_query["query"]["bool"]["must_not"].append(item) else: search_query["query"]["bool"]["must_not"] = [item] if sort: search_query["sort"] = sort if "index" in query_params: index = query_params["index"] if index == "main": index = settings.OPENSEARCH_INDEX_MAIN else: if not request.user.has_perm(f"core.index_{index}"): message = "Not permitted to search by this index" message_class = "danger" return { "message": message, "class": message_class, } if index == "meta": index = settings.OPENSEARCH_INDEX_META elif index == "int": index = settings.OPENSEARCH_INDEX_INT else: message = "Index is not valid." message_class = "danger" return { "message": message, "class": message_class, } else: index = settings.OPENSEARCH_INDEX_MAIN filter = True if source in settings.SAFE_SOURCES: filter = False results = run_main_query( client, request.user, # passed through run_main_query to filter_blacklisted search_query, custom_query=True, index=index, size=size, filter=filter, ) if not results: return False if isinstance(results, Exception): message = f"Error: {results.info['error']['root_cause'][0]['type']}" message_class = "danger" return {"message": message, "class": message_class} if len(results["hits"]["hits"]) == 0: message = "No results." message_class = "danger" return {"message": message, "class": message_class} results_parsed = parse_results(results) if annotate: annotate_results(results_parsed) if "dedup" in query_params: if query_params["dedup"] == "on": dedup = True else: dedup = False else: dedup = False if reverse: results_parsed = results_parsed[::-1] if dedup: if not dedup_fields: dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] results_parsed = dedup_list(results_parsed, dedup_fields) if source not in settings.SAFE_SOURCES: if settings.ENCRYPTION: encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY) if settings.HASHING: hash_list(request.user, results_parsed) if settings.OBFUSCATION: obfuscate_list(request.user, results_parsed) if settings.RANDOMISATION: randomise_list(request.user, results_parsed) # process_list(results) # IMPORTANT! - DO NOT PASS query_params to the user! context = { "object_list": results_parsed, "card": results["hits"]["total"]["value"], "took": results["took"], } if "redacted" in results: context["redacted"] = results["redacted"] if "exemption" in results: context["exemption"] = results["exemption"] if query: context["query"] = query if settings.DELAY_RESULTS: if source not in settings.SAFE_SOURCES: if not request.user.has_perm("core.bypass_delay"): context["delay"] = settings.DELAY_DURATION if settings.RANDOMISATION: if source not in settings.SAFE_SOURCES: if not request.user.has_perm("core.bypass_randomisation"): context["randomised"] = True return context def query_single_result(request, query_params): context = query_results(request, query_params, size=100) if not context: return {"message": "Failed to run query", "message_class": "danger"} if "message" in context: return context dedup_set = {item["nick"] for item in context["object_list"]} if dedup_set: context["item"] = context["object_list"][0] return context