# from copy import deepcopy # from datetime import datetime, timedelta from django.conf import settings from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError, RequestError from core.db import StorageBackend # from json import dumps # pp = lambda x: print(dumps(x, indent=2)) from core.db.processing import annotate_results, parse_results from core.views import helpers class ElasticsearchBackend(StorageBackend): def __init__(self): super().__init__("Elasticsearch") def initialise(self, **kwargs): """ Inititialise the Elastuicsearch API endpoint. """ auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = Elasticsearch( # fmt: off hosts=[{"host": settings.ELASTICSEARCH_URL, "port": settings.ELASTICSEARCH_PORT}], http_compress=False, # enables gzip compression for request bodies http_auth=auth, # client_cert = client_cert_path, # client_key = client_key_path, use_ssl=settings.ELASTICSEARCH_TLS, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, # a_certs=ca_certs_path, ) self.client = client def construct_query(self, query, size, use_query_string=True, tokens=False): """ Accept some query parameters and construct an Elasticsearch query. """ if not size: size = 5 query_base = { "size": size, "query": {"bool": {"must": []}}, } query_string = { "query_string": { "query": query, # "fields": fields, # "default_field": "msg", # "type": "best_fields", "fuzziness": "AUTO", "fuzzy_transpositions": True, "fuzzy_max_expansions": 50, "fuzzy_prefix_length": 0, # "minimum_should_match": 1, "default_operator": "or", "analyzer": "standard", "lenient": True, "boost": 1, "allow_leading_wildcard": True, # "enable_position_increments": False, "phrase_slop": 3, # "max_determinized_states": 10000, "quote_field_suffix": "", "quote_analyzer": "standard", "analyze_wildcard": False, "auto_generate_synonyms_phrase_query": True, } } query_tokens = { "simple_query_string": { # "tokens": query, "query": query, "fields": ["tokens"], "flags": "ALL", "fuzzy_transpositions": True, "fuzzy_max_expansions": 50, "fuzzy_prefix_length": 0, "default_operator": "and", "analyzer": "standard", "lenient": True, "boost": 1, "quote_field_suffix": "", "analyze_wildcard": False, "auto_generate_synonyms_phrase_query": False, } } if tokens: query_base["query"]["bool"]["must"].append(query_tokens) # query["query"]["bool"]["must"].append(query_string) # query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"] elif use_query_string: query_base["query"]["bool"]["must"].append(query_string) return query_base def run_query(self, client, user, query, custom_query=False, index=None, size=None): """ Low level helper to run an ES query. Accept a user to pass it to the filter, so we can avoid filtering for superusers. Accept fields and size, for the fields we want to match and the number of results to return. """ if not index: index = settings.INDEX_MAIN if custom_query: search_query = query else: search_query = self.construct_query(query, size) try: response = client.search(body=search_query, index=index) except RequestError as err: print("Elasticsearch error", err) return err except NotFoundError as err: print("Elasticsearch error", err) return err return response def query_results( self, request, query_params, size=None, annotate=True, custom_query=False, reverse=False, dedup=False, dedup_fields=None, tags=None, ): query = None message = None message_class = None add_bool = [] add_top = [] add_top_negative = [] sort = None query_created = False helpers.add_defaults(query_params) # Now, run the helpers for SIQTSRSS/ADR # S - Size # I - Index # Q - Query # T - Tags # S - Source # R - Ranges # S - Sort # S - Sentiment # A - Annotate # D - Dedup # R - Reverse # S - Size if request.user.is_anonymous: sizes = settings.MAIN_SIZES_ANON else: sizes = settings.MAIN_SIZES if not size: size = self.parse_size(query_params, sizes) if isinstance(size, dict): return size # I - Index if "index" in query_params: index = query_params["index"] if index == "main": index = settings.INDEX_MAIN else: if not request.user.has_perm(f"core.index_{index}"): message = "Not permitted to search by this index" message_class = "danger" return { "message": message, "class": message_class, } if index == "meta": index = settings.INDEX_META elif index == "internal": index = settings.INDEX_INT else: message = "Index is not valid." message_class = "danger" return { "message": message, "class": message_class, } else: index = settings.INDEX_MAIN # Q/T - Query/Tags # Only one of query or query_full can be active at once # We prefer query because it's simpler if "query" in query_params: query = query_params["query"] search_query = self.construct_query(query, size, tokens=True) query_created = True elif "query_full" in query_params: query_full = query_params["query_full"] # if request.user.has_perm("core.query_search"): search_query = self.construct_query(query_full, size) query_created = True # else: # message = "You cannot search by query string" # message_class = "danger" # return {"message": message, "class": message_class} else: if custom_query: search_query = custom_query if tags: # Get a blank search query if not query_created: search_query = self.construct_query(None, size, use_query_string=False) query_created = True for tagname, tagvalue in tags.items(): add_bool.append({tagname: tagvalue}) required_any = ["query_full", "query", "tags"] if not any([field in query_params.keys() for field in required_any]): if not custom_query: message = "Empty query!" message_class = "warning" return {"message": message, "class": message_class} # S - Sources source = None if "source" in query_params: source = query_params["source"] if source in settings.SOURCES_RESTRICTED: if not request.user.has_perm("core.restricted_sources"): message = "Access denied" message_class = "danger" return {"message": message, "class": message_class} elif source not in settings.MAIN_SOURCES: message = "Invalid source" message_class = "danger" return {"message": message, "class": message_class} if source == "all": source = None # the next block will populate it if source: sources = [source] else: sources = settings.MAIN_SOURCES if request.user.has_perm("core.restricted_sources"): for source_iter in settings.SOURCES_RESTRICTED: sources.append(source_iter) add_top_tmp = {"bool": {"should": []}} for source_iter in sources: add_top_tmp["bool"]["should"].append({"match_phrase": {"src": source_iter}}) add_top.append(add_top_tmp) # R - Ranges # date_query = False if set({"from_date", "to_date", "from_time", "to_time"}).issubset( query_params.keys() ): from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" range_query = { "range": { "ts": { "gt": from_ts, "lt": to_ts, } } } add_top.append(range_query) # if date_query: # if settings.DELAY_RESULTS: # if source not in settings.SAFE_SOURCES: # if request.user.has_perm("core.bypass_delay"): # add_top.append(range_query) # else: # delay_as_ts = datetime.now() - timedelta( # days=settings.DELAY_DURATION # ) # lt_as_ts = datetime.strptime( # range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ" # ) # if lt_as_ts > delay_as_ts: # range_query["range"]["ts"][ # "lt" # ] = f"now-{settings.DELAY_DURATION}d" # add_top.append(range_query) # else: # add_top.append(range_query) # else: # if settings.DELAY_RESULTS: # if source not in settings.SAFE_SOURCES: # if not request.user.has_perm("core.bypass_delay"): # range_query = { # "range": { # "ts": { # # "gt": , # "lt": f"now-{settings.DELAY_DURATION}d", # } # } # } # add_top.append(range_query) # S - Sort if "sorting" in query_params: sorting = query_params["sorting"] if sorting not in ("asc", "desc", "none"): message = "Invalid sort" message_class = "danger" return {"message": message, "class": message_class} if sorting in ("asc", "desc"): sort = [ { "ts": { "order": sorting, } } ] # S - Sentiment if "check_sentiment" in query_params: if "sentiment_method" not in query_params: message = "No sentiment method" message_class = "danger" return {"message": message, "class": message_class} if "sentiment" in query_params: sentiment = query_params["sentiment"] try: sentiment = float(sentiment) except ValueError: message = "Sentiment is not a float" message_class = "danger" return {"message": message, "class": message_class} sentiment_method = query_params["sentiment_method"] range_query_compare = {"range": {"sentiment": {}}} range_query_precise = { "match": { "sentiment": None, } } if sentiment_method == "below": range_query_compare["range"]["sentiment"]["lt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "above": range_query_compare["range"]["sentiment"]["gt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "exact": range_query_precise["match"]["sentiment"] = sentiment add_top.append(range_query_precise) elif sentiment_method == "nonzero": range_query_precise["match"]["sentiment"] = 0 add_top_negative.append(range_query_precise) if add_bool: # if "bool" not in search_query["query"]: # search_query["query"]["bool"] = {} # if "must" not in search_query["query"]["bool"]: # search_query["query"]["bool"] = {"must": []} for item in add_bool: search_query["query"]["bool"]["must"].append({"match_phrase": item}) if add_top: for item in add_top: search_query["query"]["bool"]["must"].append(item) if add_top_negative: for item in add_top_negative: if "must_not" in search_query["query"]["bool"]: search_query["query"]["bool"]["must_not"].append(item) else: search_query["query"]["bool"]["must_not"] = [item] if sort: search_query["sort"] = sort results = self.query( request.user, # passed through run_main_query to filter_blacklisted search_query, custom_query=True, index=index, size=size, ) if not results: return False if isinstance(results, Exception): message = f"Error: {results.info['error']['root_cause'][0]['type']}" message_class = "danger" return {"message": message, "class": message_class} if len(results["hits"]["hits"]) == 0: message = "No results." message_class = "danger" return {"message": message, "class": message_class} results_parsed = parse_results(results) # A/D/R - Annotate/Dedup/Reverse if annotate: annotate_results(results_parsed) if "dedup" in query_params: if query_params["dedup"] == "on": dedup = True else: dedup = False else: dedup = False if reverse: results_parsed = results_parsed[::-1] if dedup: if not dedup_fields: dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] results_parsed = helpers.dedup_list(results_parsed, dedup_fields) # if source not in settings.SAFE_SOURCES: # if settings.ENCRYPTION: # encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY) # if settings.HASHING: # hash_list(request.user, results_parsed) # if settings.OBFUSCATION: # obfuscate_list(request.user, results_parsed) # if settings.RANDOMISATION: # randomise_list(request.user, results_parsed) # process_list(results) # IMPORTANT! - DO NOT PASS query_params to the user! context = { "object_list": results_parsed, "card": results["hits"]["total"]["value"], "took": results["took"], } if "redacted" in results: context["redacted"] = results["redacted"] if "exemption" in results: context["exemption"] = results["exemption"] if query: context["query"] = query # if settings.DELAY_RESULTS: # if source not in settings.SAFE_SOURCES: # if not request.user.has_perm("core.bypass_delay"): # context["delay"] = settings.DELAY_DURATION # if settings.RANDOMISATION: # if source not in settings.SAFE_SOURCES: # if not request.user.has_perm("core.bypass_randomisation"): # context["randomised"] = True return context def query_single_result(self, request, query_params): context = self.query_results(request, query_params, size=100) if not context: return {"message": "Failed to run query", "message_class": "danger"} if "message" in context: return context dedup_set = {item["nick"] for item in context["object_list"]} if dedup_set: context["item"] = context["object_list"][0] return context