From 0fd004ca7d5fa1cd41e0d7bd60186c71d27bba7e Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 23 Nov 2022 18:15:42 +0000 Subject: [PATCH] Finish reimplementing Elasticsearch --- core/__init__.py | 2 +- core/db/__init__.py | 32 ++- core/db/druid.py | 11 +- core/db/elastic.py | 483 +++++++++++++++---------------------- core/views/ui/drilldown.py | 14 +- 5 files changed, 218 insertions(+), 324 deletions(-) diff --git a/core/__init__.py b/core/__init__.py index 73d371f..9302e9b 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -2,7 +2,7 @@ import stripe from django.conf import settings from redis import StrictRedis -r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) +r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0) if settings.STRIPE_TEST: stripe.api_key = settings.STRIPE_API_KEY_TEST diff --git a/core/db/__init__.py b/core/db/__init__.py index 5364877..8f9873d 100644 --- a/core/db/__init__.py +++ b/core/db/__init__.py @@ -97,22 +97,22 @@ class StorageBackend(ABC): index = settings.INDEX_MAIN return index - def parse_query(self, query_params, tags, size, index, custom_query, add_bool): + def parse_query(self, query_params, tags, size, custom_query, add_bool, **kwargs): query_created = False if "query" in query_params: query = query_params["query"] - search_query = self.construct_query(query, size, index) + search_query = self.construct_query(query, size, **kwargs) query_created = True else: if custom_query: search_query = custom_query else: - search_query = self.construct_query(None, size, index, blank=True) + search_query = self.construct_query(None, size, blank=True, **kwargs) if tags: # Get a blank search query if not query_created: - search_query = self.construct_query(None, size, index, blank=True) + search_query = self.construct_query(None, size, blank=True, **kwargs) query_created = True for item in tags: for tagname, tagvalue in item.items(): @@ -232,9 +232,7 @@ class StorageBackend(ABC): if blacklisted_type in item[data_index].keys(): content = item[data_index][blacklisted_type] # For every item in the blacklisted array for the type - for blacklisted_item in settings.ELASTICSEARCH_BLACKLISTED[ - blacklisted_type - ]: + for blacklisted_item in settings.BLACKLISTED[blacklisted_type]: if blacklisted_item == str(content): # Remove the item if item in response["hits"]["hits"]: @@ -259,7 +257,7 @@ class StorageBackend(ABC): # Actually get rid of all the things we set to None response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit] - def query(self, user, search_query): + def query(self, user, search_query, **kwargs): # For time tracking start = time.process_time() if settings.CACHE: @@ -281,7 +279,19 @@ class StorageBackend(ABC): "took": time_took_rounded, "cache": True, } - response = self.run_query(user, search_query) + response = self.run_query(user, search_query, **kwargs) + + # For Elasticsearch + if isinstance(response, Exception): + message = f"Error: {response.info['error']['root_cause'][0]['type']}" + message_class = "danger" + return {"message": message, "class": message_class} + if len(response["hits"]["hits"]) == 0: + message = "No results." + message_class = "danger" + return {"message": message, "class": message_class} + + # For Druid if "error" in response: if "errorMessage" in response: context = { @@ -296,7 +306,9 @@ class StorageBackend(ABC): if "took" in response: if response["took"] is None: return None - self.filter_blacklisted(user, response) + + # Removed for now, no point given we have restricted indexes + # self.filter_blacklisted(user, response) # Parse the response response_parsed = self.parse(response) diff --git a/core/db/druid.py b/core/db/druid.py index d35305a..99d92f9 100644 --- a/core/db/druid.py +++ b/core/db/druid.py @@ -77,7 +77,8 @@ class DruidBackend(StorageBackend): self.add_type("or", search_query, extra_should2) return search_query - def construct_query(self, query, size, index, blank=False): + def construct_query(self, query, size, blank=False, **kwargs): + index = kwargs.get("index") search_query = { "limit": size, "queryType": "scan", @@ -107,19 +108,13 @@ class DruidBackend(StorageBackend): def parse(self, response): parsed = parse_druid(response) - print("PARSE LEN", len(parsed)) return parsed def run_query(self, user, search_query): ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2) ss = ss.decode() - print(ss) response = requests.post("http://druid:8082/druid/v2", json=search_query) response = orjson.loads(response.text) - print("RESPONSE LEN", len(response)) - # ss = orjson.dumps(response, option=orjson.OPT_INDENT_2) - # ss = ss.decode() - # print(ss) return response def filter_blacklisted(self, user, response): @@ -172,7 +167,7 @@ class DruidBackend(StorageBackend): # Q/T - Query/Tags search_query = self.parse_query( - query_params, tags, size, index, custom_query, add_bool + query_params, tags, size, custom_query, add_bool, index=index ) # Query should be a dict, so check if it contains message here if "message" in search_query: diff --git a/core/db/elastic.py b/core/db/elastic.py index 2eb72dd..0f850a4 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -9,7 +9,7 @@ from core.db import StorageBackend # from json import dumps # pp = lambda x: print(dumps(x, indent=2)) -from core.db.processing import annotate_results, parse_results +from core.db.processing import parse_results from core.views import helpers @@ -23,27 +23,114 @@ class ElasticsearchBackend(StorageBackend): """ auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = Elasticsearch( - # fmt: off - hosts=[{"host": settings.ELASTICSEARCH_URL, - "port": settings.ELASTICSEARCH_PORT}], - http_compress=False, # enables gzip compression for request bodies - http_auth=auth, - # client_cert = client_cert_path, - # client_key = client_key_path, - use_ssl=settings.ELASTICSEARCH_TLS, - verify_certs=False, - ssl_assert_hostname=False, - ssl_show_warn=False, - # a_certs=ca_certs_path, + settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False ) self.client = client - def construct_query(self, query, size, use_query_string=True, tokens=False): + def construct_context_query( + self, index, net, channel, src, num, size, type=None, nicks=None + ): + # Get the initial query + query = self.construct_query(None, size, blank=True) + + extra_must = [] + extra_should = [] + extra_should2 = [] + if num: + extra_must.append({"equals": {"num": num}}) + if net: + extra_must.append({"match_phrase": {"net": net}}) + if channel: + extra_must.append({"match": {"channel": channel}}) + if nicks: + for nick in nicks: + extra_should2.append({"match": {"nick": nick}}) + + types = ["msg", "notice", "action", "kick", "topic", "mode"] + fields = [ + "nick", + "ident", + "host", + "channel", + "ts", + "msg", + "type", + "net", + "src", + "tokens", + ] + query["fields"] = fields + + if index == "internal": + fields.append("mtype") + if channel == "*status" or type == "znc": + if {"match": {"channel": channel}} in extra_must: + extra_must.remove({"match": {"channel": channel}}) + extra_should2 = [] + # Type is one of msg or notice + # extra_should.append({"match": {"mtype": "msg"}}) + # extra_should.append({"match": {"mtype": "notice"}}) + extra_should.append({"match": {"type": "znc"}}) + extra_should.append({"match": {"type": "self"}}) + + extra_should2.append({"match": {"type": "znc"}}) + extra_should2.append({"match": {"nick": channel}}) + elif type == "auth": + if {"match": {"channel": channel}} in extra_must: + extra_must.remove({"match": {"channel": channel}}) + extra_should2 = [] + extra_should2.append({"match": {"nick": channel}}) + # extra_should2.append({"match": {"mtype": "msg"}}) + # extra_should2.append({"match": {"mtype": "notice"}}) + + extra_should.append({"match": {"type": "query"}}) + extra_should2.append({"match": {"type": "self"}}) + extra_should.append({"match": {"nick": channel}}) + else: + for ctype in types: + extra_should.append({"equals": {"mtype": ctype}}) + else: + for ctype in types: + extra_should.append({"match": {"type": ctype}}) + # query = { + # "index": index, + # "limit": size, + # "query": { + # "bool": { + # "must": [ + # # {"equals": {"src": src}}, + # # { + # # "bool": { + # # "should": [*extra_should], + # # } + # # }, + # # { + # # "bool": { + # # "should": [*extra_should2], + # # } + # # }, + # *extra_must, + # ] + # } + # }, + # "fields": fields, + # # "_source": False, + # } + if extra_must: + for x in extra_must: + query["query"]["bool"]["must"].append(x) + if extra_should: + query["query"]["bool"]["must"].append({"bool": {"should": [*extra_should]}}) + if extra_should2: + query["query"]["bool"]["must"].append( + {"bool": {"should": [*extra_should2]}} + ) + return query + + def construct_query(self, query, size, blank=False): """ Accept some query parameters and construct an Elasticsearch query. """ - if not size: - size = 5 query_base = { "size": size, "query": {"bool": {"must": []}}, @@ -73,33 +160,15 @@ class ElasticsearchBackend(StorageBackend): "auto_generate_synonyms_phrase_query": True, } } - query_tokens = { - "simple_query_string": { - # "tokens": query, - "query": query, - "fields": ["tokens"], - "flags": "ALL", - "fuzzy_transpositions": True, - "fuzzy_max_expansions": 50, - "fuzzy_prefix_length": 0, - "default_operator": "and", - "analyzer": "standard", - "lenient": True, - "boost": 1, - "quote_field_suffix": "", - "analyze_wildcard": False, - "auto_generate_synonyms_phrase_query": False, - } - } - if tokens: - query_base["query"]["bool"]["must"].append(query_tokens) - # query["query"]["bool"]["must"].append(query_string) - # query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"] - elif use_query_string: + if not blank: query_base["query"]["bool"]["must"].append(query_string) return query_base - def run_query(self, client, user, query, custom_query=False, index=None, size=None): + def parse(self, response): + parsed = parse_results(response) + return parsed + + def run_query(self, user, search_query, **kwargs): """ Low level helper to run an ES query. Accept a user to pass it to the filter, so we can @@ -107,14 +176,9 @@ class ElasticsearchBackend(StorageBackend): Accept fields and size, for the fields we want to match and the number of results to return. """ - if not index: - index = settings.INDEX_MAIN - if custom_query: - search_query = query - else: - search_query = self.construct_query(query, size) + index = kwargs.get("index") try: - response = client.search(body=search_query, index=index) + response = self.client.search(body=search_query, index=index) except RequestError as err: print("Elasticsearch error", err) return err @@ -136,14 +200,9 @@ class ElasticsearchBackend(StorageBackend): tags=None, ): - query = None - message = None - message_class = None add_bool = [] add_top = [] add_top_negative = [] - sort = None - query_created = False helpers.add_defaults(query_params) @@ -171,106 +230,36 @@ class ElasticsearchBackend(StorageBackend): return size # I - Index - if "index" in query_params: - index = query_params["index"] - if index == "main": - index = settings.INDEX_MAIN - else: - if not request.user.has_perm(f"core.index_{index}"): - message = "Not permitted to search by this index" - message_class = "danger" - return { - "message": message, - "class": message_class, - } - if index == "meta": - index = settings.INDEX_META - elif index == "internal": - index = settings.INDEX_INT - else: - message = "Index is not valid." - message_class = "danger" - return { - "message": message, - "class": message_class, - } - - else: - index = settings.INDEX_MAIN + index = self.parse_index(request.user, query_params) + if isinstance(index, dict): + return index # Q/T - Query/Tags - # Only one of query or query_full can be active at once - # We prefer query because it's simpler - if "query" in query_params: - query = query_params["query"] - search_query = self.construct_query(query, size, tokens=True) - query_created = True - elif "query_full" in query_params: - query_full = query_params["query_full"] - # if request.user.has_perm("core.query_search"): - search_query = self.construct_query(query_full, size) - query_created = True - # else: - # message = "You cannot search by query string" - # message_class = "danger" - # return {"message": message, "class": message_class} - else: - if custom_query: - search_query = custom_query - - if tags: - # Get a blank search query - if not query_created: - search_query = self.construct_query(None, size, use_query_string=False) - query_created = True - for tagname, tagvalue in tags.items(): - add_bool.append({tagname: tagvalue}) - - required_any = ["query_full", "query", "tags"] - if not any([field in query_params.keys() for field in required_any]): - if not custom_query: - message = "Empty query!" - message_class = "warning" - return {"message": message, "class": message_class} + search_query = self.parse_query( + query_params, tags, size, custom_query, add_bool + ) + # Query should be a dict, so check if it contains message here + if "message" in search_query: + return search_query # S - Sources - source = None - if "source" in query_params: - source = query_params["source"] - - if source in settings.SOURCES_RESTRICTED: - if not request.user.has_perm("core.restricted_sources"): - message = "Access denied" - message_class = "danger" - return {"message": message, "class": message_class} - elif source not in settings.MAIN_SOURCES: - message = "Invalid source" - message_class = "danger" - return {"message": message, "class": message_class} - - if source == "all": - source = None # the next block will populate it - - if source: - sources = [source] - else: - sources = settings.MAIN_SOURCES - if request.user.has_perm("core.restricted_sources"): - for source_iter in settings.SOURCES_RESTRICTED: - sources.append(source_iter) - - add_top_tmp = {"bool": {"should": []}} - for source_iter in sources: - add_top_tmp["bool"]["should"].append({"match_phrase": {"src": source_iter}}) - add_top.append(add_top_tmp) + sources = self.parse_source(request.user, query_params) + if isinstance(sources, dict): + return sources + total_count = len(sources) + total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED) + if total_count != total_sources: + add_top_tmp = {"bool": {"should": []}} + for source_iter in sources: + add_top_tmp["bool"]["should"].append( + {"match_phrase": {"src": source_iter}} + ) + add_top.append(add_top_tmp) # R - Ranges # date_query = False - if set({"from_date", "to_date", "from_time", "to_time"}).issubset( - query_params.keys() - ): - from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" - to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" + from_ts, to_ts = self.parse_date_time(query_params) + if from_ts: range_query = { "range": { "ts": { @@ -281,70 +270,28 @@ class ElasticsearchBackend(StorageBackend): } add_top.append(range_query) - # if date_query: - # if settings.DELAY_RESULTS: - # if source not in settings.SAFE_SOURCES: - # if request.user.has_perm("core.bypass_delay"): - # add_top.append(range_query) - # else: - # delay_as_ts = datetime.now() - timedelta( - # days=settings.DELAY_DURATION - # ) - # lt_as_ts = datetime.strptime( - # range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ" - # ) - # if lt_as_ts > delay_as_ts: - # range_query["range"]["ts"][ - # "lt" - # ] = f"now-{settings.DELAY_DURATION}d" - # add_top.append(range_query) - # else: - # add_top.append(range_query) - # else: - # if settings.DELAY_RESULTS: - # if source not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_delay"): - # range_query = { - # "range": { - # "ts": { - # # "gt": , - # "lt": f"now-{settings.DELAY_DURATION}d", - # } - # } - # } - # add_top.append(range_query) - # S - Sort - if "sorting" in query_params: - sorting = query_params["sorting"] - if sorting not in ("asc", "desc", "none"): - message = "Invalid sort" - message_class = "danger" - return {"message": message, "class": message_class} - if sorting in ("asc", "desc"): - sort = [ - { - "ts": { - "order": sorting, - } + sort = self.parse_sort(query_params) + if isinstance(sort, dict): + return sort + if sort: + # For Druid compatibility + sort_map = {"ascending": "asc", "descending": "desc"} + sorting = [ + { + "ts": { + "order": sort_map[sort], } - ] + } + ] + search_query["sort"] = sorting # S - Sentiment - if "check_sentiment" in query_params: - if "sentiment_method" not in query_params: - message = "No sentiment method" - message_class = "danger" - return {"message": message, "class": message_class} - if "sentiment" in query_params: - sentiment = query_params["sentiment"] - try: - sentiment = float(sentiment) - except ValueError: - message = "Sentiment is not a float" - message_class = "danger" - return {"message": message, "class": message_class} - sentiment_method = query_params["sentiment_method"] + sentiment_r = self.parse_sentiment(query_params) + if isinstance(sentiment_r, dict): + return sentiment_r + if sentiment_r: + sentiment_method, sentiment = sentiment_r range_query_compare = {"range": {"sentiment": {}}} range_query_precise = { "match": { @@ -364,100 +311,27 @@ class ElasticsearchBackend(StorageBackend): range_query_precise["match"]["sentiment"] = 0 add_top_negative.append(range_query_precise) - if add_bool: - # if "bool" not in search_query["query"]: - # search_query["query"]["bool"] = {} - # if "must" not in search_query["query"]["bool"]: - # search_query["query"]["bool"] = {"must": []} - - for item in add_bool: - search_query["query"]["bool"]["must"].append({"match_phrase": item}) - if add_top: - for item in add_top: - search_query["query"]["bool"]["must"].append(item) - if add_top_negative: - for item in add_top_negative: - if "must_not" in search_query["query"]["bool"]: - search_query["query"]["bool"]["must_not"].append(item) - else: - search_query["query"]["bool"]["must_not"] = [item] - if sort: - search_query["sort"] = sort + # Add in the additional information we already populated + self.add_bool(search_query, add_bool) + self.add_top(search_query, add_top) + self.add_top(search_query, add_top_negative, negative=True) - results = self.query( - request.user, # passed through run_main_query to filter_blacklisted + response = self.query( + request.user, search_query, - custom_query=True, index=index, - size=size, ) - if not results: - return False - if isinstance(results, Exception): - message = f"Error: {results.info['error']['root_cause'][0]['type']}" - message_class = "danger" - return {"message": message, "class": message_class} - if len(results["hits"]["hits"]) == 0: - message = "No results." - message_class = "danger" - return {"message": message, "class": message_class} - - results_parsed = parse_results(results) # A/D/R - Annotate/Dedup/Reverse - if annotate: - annotate_results(results_parsed) - if "dedup" in query_params: - if query_params["dedup"] == "on": - dedup = True - else: - dedup = False - else: - dedup = False - - if reverse: - results_parsed = results_parsed[::-1] - - if dedup: - if not dedup_fields: - dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] - results_parsed = helpers.dedup_list(results_parsed, dedup_fields) - - # if source not in settings.SAFE_SOURCES: - # if settings.ENCRYPTION: - # encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY) - - # if settings.HASHING: - # hash_list(request.user, results_parsed) - - # if settings.OBFUSCATION: - # obfuscate_list(request.user, results_parsed) - - # if settings.RANDOMISATION: - # randomise_list(request.user, results_parsed) - - # process_list(results) + self.process_results( + response, + annotate=annotate, + dedup=dedup, + dedup_fields=dedup_fields, + reverse=reverse, + ) - # IMPORTANT! - DO NOT PASS query_params to the user! - context = { - "object_list": results_parsed, - "card": results["hits"]["total"]["value"], - "took": results["took"], - } - if "redacted" in results: - context["redacted"] = results["redacted"] - if "exemption" in results: - context["exemption"] = results["exemption"] - if query: - context["query"] = query - # if settings.DELAY_RESULTS: - # if source not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_delay"): - # context["delay"] = settings.DELAY_DURATION - # if settings.RANDOMISATION: - # if source not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_randomisation"): - # context["randomised"] = True + context = response return context def query_single_result(self, request, query_params): @@ -472,3 +346,28 @@ class ElasticsearchBackend(StorageBackend): context["item"] = context["object_list"][0] return context + + def add_bool(self, search_query, add_bool): + """ + Add the specified boolean matches to search query. + """ + if not add_bool: + return + for item in add_bool: + search_query["query"]["bool"]["must"].append({"match_phrase": item}) + + def add_top(self, search_query, add_top, negative=False): + """ + Merge add_top with the base of the search_query. + """ + if not add_top: + return + if negative: + for item in add_top: + if "must_not" in search_query["query"]["bool"]: + search_query["query"]["bool"]["must_not"].append(item) + else: + search_query["query"]["bool"]["must_not"] = [item] + else: + for item in add_top: + search_query["query"]["bool"]["must"].append(item) diff --git a/core/views/ui/drilldown.py b/core/views/ui/drilldown.py index ab01165..048ce65 100644 --- a/core/views/ui/drilldown.py +++ b/core/views/ui/drilldown.py @@ -377,6 +377,7 @@ class DrilldownContextModal(APIView): type=type, nicks=nicks_sensitive, ) + print("QUERY", search_query) results = db.query_results( request, query_params, @@ -389,19 +390,6 @@ class DrilldownContextModal(APIView): if "message" in results: return render(request, self.template_name, results) - # if settings.HASHING: # we probably want to see the tokens - # if query_params["source"] not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_hashing"): - # for index, item in enumerate(results["object_list"]): - # if "tokens" in item: - # results["object_list"][index]["msg"] = results[ - # "object_list" - # ][index].pop("tokens") - # # item["msg"] = item.pop("tokens") - - # Make the time nicer - # for index, item in enumerate(results["object_list"]): - # results["object_list"][index]["time"] = item["time"]+"SSS" unique = str(uuid.uuid4())[:8] context = { "net": query_params["net"],