diff --git a/core/db/__init__.py b/core/db/__init__.py index 64b201f..0f51172 100644 --- a/core/db/__init__.py +++ b/core/db/__init__.py @@ -1,6 +1,7 @@ import random import string import time +from datetime import datetime from math import floor, log10 import orjson @@ -10,6 +11,7 @@ from siphashc import siphash from core import r from core.db.processing import annotate_results from core.util import logs +from core.views import helpers class StorageBackend(object): @@ -71,6 +73,15 @@ class StorageBackend(object): index = settings.INDEX_META elif index == "internal": index = settings.INDEX_INT + elif index == "restricted": + if not user.has_perm("core.restricted_sources"): + message = "Not permitted to search by this index" + message_class = "danger" + return { + "message": message, + "class": message_class, + } + index = settings.INDEX_RESTRICTED else: message = "Index is not valid." message_class = "danger" @@ -83,6 +94,7 @@ class StorageBackend(object): return index def parse_query(self, query_params, tags, size, index, custom_query, add_bool): + query_created = False if "query" in query_params: query = query_params["query"] search_query = self.construct_query(query, size, index) @@ -90,6 +102,8 @@ class StorageBackend(object): else: if custom_query: search_query = custom_query + else: + search_query = self.construct_query(None, size, index, blank=True) if tags: # Get a blank search query @@ -99,6 +113,13 @@ class StorageBackend(object): for tagname, tagvalue in tags.items(): add_bool.append({tagname: tagvalue}) + valid = self.check_valid_query(query_params, custom_query) + if isinstance(valid, dict): + return valid + + return search_query + + def check_valid_query(self, query_params, custom_query): required_any = ["query", "tags"] if not any([field in query_params.keys() for field in required_any]): if not custom_query: @@ -106,8 +127,6 @@ class StorageBackend(object): message_class = "warning" return {"message": message, "class": message_class} - return search_query - def parse_source(self, user, query_params): if "source" in query_params: source = query_params["source"] @@ -133,11 +152,59 @@ class StorageBackend(object): for source_iter in settings.SOURCES_RESTRICTED: sources.append(source_iter) + if "all" in sources: + sources.remove("all") + return sources + def parse_sort(self, query_params): + sort = None + if "sorting" in query_params: + sorting = query_params["sorting"] + if sorting not in ("asc", "desc", "none"): + message = "Invalid sort" + message_class = "danger" + return {"message": message, "class": message_class} + if sorting == "asc": + sort = "ascending" + elif sorting == "desc": + sort = "descending" + return sort + + def parse_date_time(self, query_params): + if set({"from_date", "to_date", "from_time", "to_time"}).issubset( + query_params.keys() + ): + from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" + to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" + from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") + to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") + + return (from_ts, to_ts) + return (None, None) + + def parse_sentiment(self, query_params): + sentiment = None + if "check_sentiment" in query_params: + if "sentiment_method" not in query_params: + message = "No sentiment method" + message_class = "danger" + return {"message": message, "class": message_class} + if "sentiment" in query_params: + sentiment = query_params["sentiment"] + try: + sentiment = float(sentiment) + except ValueError: + message = "Sentiment is not a float" + message_class = "danger" + return {"message": message, "class": message_class} + sentiment_method = query_params["sentiment_method"] + + return (sentiment_method, sentiment) + def filter_blacklisted(self, user, response): """ - Low level filter to take the raw OpenSearch response and remove + Low level filter to take the raw search response and remove objects from it we want to keep secret. Does not return, the object is mutated in place. """ @@ -197,11 +264,28 @@ class StorageBackend(object): cache_hit = r.get(f"query_cache.{user.id}.{hash}") if cache_hit: response = orjson.loads(cache_hit) - response["cache"] = True - return response + print("CACHE HIT", response) + + time_took = (time.process_time() - start) * 1000 + # Round to 3 significant figures + time_took_rounded = round( + time_took, 3 - int(floor(log10(abs(time_took)))) - 1 + ) + return { + "object_list": response, + "took": time_took_rounded, + "cache": True, + } response = self.run_query(user, search_query) - if "error" in response and len(response.keys()) == 1: - return response + if "error" in response: + if "errorMessage" in response: + context = { + "message": response["errorMessage"], + "class": "danger", + } + return context + else: + return response # response = response.to_dict() # print("RESP", response) if "took" in response: @@ -209,15 +293,15 @@ class StorageBackend(object): return None self.filter_blacklisted(user, response) + # Parse the response + response_parsed = self.parse(response) + # Write cache if settings.CACHE: - to_write_cache = orjson.dumps(response) + to_write_cache = orjson.dumps(response_parsed) r.set(f"query_cache.{user.id}.{hash}", to_write_cache) r.expire(f"query_cache.{user.id}.{hash}", settings.CACHE_TIMEOUT) - # Parse the response - response_parsed = self.parse(response) - time_took = (time.process_time() - start) * 1000 # Round to 3 significant figures time_took_rounded = round(time_took, 3 - int(floor(log10(abs(time_took)))) - 1) @@ -226,9 +310,15 @@ class StorageBackend(object): def query_results(self, **kwargs): raise NotImplementedError - def process_results(self, **kwargs): + def process_results(self, response, **kwargs): if kwargs.get("annotate"): - annotate_results(kwargs["results"]) + annotate_results(response) + if kwargs.get("dedup"): + response = response[::-1] + if kwargs.get("dedup"): + if not kwargs.get("dedup_fields"): + dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] + response = helpers.dedup_list(response, dedup_fields) def parse(self, response): raise NotImplementedError diff --git a/core/db/druid.py b/core/db/druid.py index 153c984..0479407 100644 --- a/core/db/druid.py +++ b/core/db/druid.py @@ -1,17 +1,9 @@ import logging -import random -import string -import time -from datetime import datetime -from math import floor, log10 -from pprint import pprint import orjson import requests from django.conf import settings -from siphashc import siphash -from core import r from core.db import StorageBackend from core.db.processing import parse_druid from core.views import helpers @@ -32,30 +24,26 @@ class DruidBackend(StorageBackend): "limit": size, "queryType": "scan", "dataSource": index, - "filter": { - "type": "and", - "fields": [ + "intervals": ["1999-01-01/2999-01-01"], + } - ], + base_filter = { + "type": "and", + "fields": [], + } + to_add = { + "type": "search", + "dimension": "msg", + "query": { + "type": "insensitive_contains", + "value": query, }, - # "resultFormat": "list", - # "columns":[], - "intervals": ["1000-01-01/3000-01-01"], - # "batchSize": 20480, } - to_add = { - "type": "search", - "dimension": "msg", - "query": { - "type": "insensitive_contains", - "value": query, - }, - }, - if blank: return search_query else: + search_query["filter"] = base_filter search_query["filter"]["fields"].append(to_add) return search_query @@ -65,12 +53,15 @@ class DruidBackend(StorageBackend): return parsed def run_query(self, user, search_query): + ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2) + ss = ss.decode() + print(ss) response = requests.post("http://broker:8082/druid/v2", json=search_query) response = orjson.loads(response.text) print("RESPONSE LEN", len(response)) - ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2) - ss = ss.decode() - print(ss) + # ss = orjson.dumps(response, option=orjson.OPT_INDENT_2) + # ss = ss.decode() + # print(ss) return response def filter_blacklisted(self, user, response): @@ -89,12 +80,24 @@ class DruidBackend(StorageBackend): tags=None, ): add_bool = [] - add_top = [] - + add_in = {} helpers.add_defaults(query_params) - # Check size + # Now, run the helpers for SIQTSRSS/ADR + # S - Size + # I - Index + # Q - Query + # T - Tags + # S - Source + # R - Ranges + # S - Sort + # S - Sentiment + # A - Annotate + # D - Dedup + # R - Reverse + + # S - Size if request.user.is_anonymous: sizes = settings.MAIN_SIZES_ANON else: @@ -104,37 +107,80 @@ class DruidBackend(StorageBackend): if isinstance(size, dict): return size - # Check index + # I - Index index = self.parse_index(request.user, query_params) if isinstance(index, dict): return index - # Create the search query - search_query = self.parse_query(query_params, tags, size, index, custom_query, add_bool) - if isinstance(search_query, dict): + # Q/T - Query/Tags + search_query = self.parse_query( + query_params, tags, size, index, custom_query, add_bool + ) + # Query should be a dict, so check if it contains message here + if "message" in search_query: return search_query + # S - Sources sources = self.parse_source(request.user, query_params) - # TODO - add_top_tmp = {"bool": {"should": []}} - total_count = 0 - for source_iter in sources: - add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) - total_count += 1 - total_sources = len(settings.MAIN_SOURCES) + len( - settings.SOURCES_RESTRICTED - ) - if not total_count == total_sources: - add_top.append(add_top_tmp) - - print("SIZE IS", size) - + if isinstance(sources, dict): + return sources + total_count = len(sources) + total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED) + if total_count != total_sources: + add_in["src"] = sources + + # R - Ranges + from_ts, to_ts = self.parse_date_time(query_params) + if from_ts: + addendum = f"{from_ts}/{to_ts}" + search_query["intervals"] = [addendum] + + # S - Sort + sort = self.parse_sort(query_params) + if isinstance(sort, dict): + return sort + if sort: + search_query["order"] = sort + + # S - Sentiment + sentiment_r = self.parse_sentiment(query_params) + if isinstance(sentiment_r, dict): + return sentiment_r + if sentiment_r: + sentiment_method, sentiment = sentiment_r + sentiment_query = {"type": "bound", "dimension": "sentiment"} + if sentiment_method == "below": + sentiment_query["upper"] = sentiment + elif sentiment_method == "above": + sentiment_query["lower"] = sentiment + elif sentiment_method == "exact": + sentiment_query["lower"] = sentiment + sentiment_query["upper"] = sentiment + elif sentiment_method == "nonzero": + sentiment_query["lower"] = -0.0001 + sentiment_query["upper"] = 0.0001 + sentiment_query["lowerStrict"] = True + sentiment_query["upperStrict"] = True + # add_bool.append(sentiment_query) + self.add_filter(search_query) + search_query["filter"]["fields"].append(sentiment_query) + + # Add in the additional information we already populated if add_bool: - self.add_bool(search_query, add_bool) + self.add_type("and", search_query, add_bool) + if add_in: + self.add_in(search_query, add_in) response = self.query(request.user, search_query) - # print("RESP", response) + # A/D/R - Annotate/Dedup/Reverse + self.process_results( + response, + annotate=annotate, + dedup=dedup, + dedup_fields=dedup_fields, + reverse=reverse, + ) # ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2) # ss = ss.decode() # print(ss) @@ -143,11 +189,29 @@ class DruidBackend(StorageBackend): context = response return context - def add_bool(self, search_query, add_bool): - if "filter" in search_query: - if "fields" in search_query["filter"]: - search_query["filter"]["fields"].append({"bool": {"should": add_bool}}) - else: - search_query["filter"]["fields"] = [{"bool": {"should": add_bool}}] - else: - search_query["filter"] = {"bool": {"should": add_bool}} + def add_filter(self, search_query): + if "filter" not in search_query: + search_query["filter"] = { + "type": "and", + "fields": [], + } + + def add_in(self, search_query, add_in): + self.add_filter(search_query) + for key, value in add_in.items(): + to_add = {"type": "in", "dimension": key, "values": value} + search_query["filter"]["fields"].append(to_add) + + def add_type(self, gate, search_query, add_bool): + top_level_bool = {"type": gate, "fields": []} + self.add_filter(search_query) + for item in add_bool: + for key, value in item.items(): + to_add = {"type": "selector", "dimension": key, "value": value} + top_level_bool["fields"].append(to_add) + + search_query["filter"]["fields"].append(top_level_bool) + + def check_valid_query(self, query_params, custom_query): + # We can do blank queries with this data source + pass diff --git a/core/db/manticore.py b/core/db/manticore.py index 81211b5..0eee282 100644 --- a/core/db/manticore.py +++ b/core/db/manticore.py @@ -1,18 +1,12 @@ import logging -import random -import string -import time from datetime import datetime -from math import floor, log10 from pprint import pprint -import orjson import requests from django.conf import settings -from core import r from core.db import StorageBackend -from core.db.processing import annotate_results, filter_blacklisted, parse_results +from core.db.processing import annotate_results, parse_results from core.views import helpers logger = logging.getLogger(__name__) @@ -120,7 +114,7 @@ class ManticoreBackend(StorageBackend): # Create the search query if "query" in query_params: query = query_params["query"] - search_query = construct_query(query, size, index) + search_query = self.construct_query(query, size, index) query_created = True else: if custom_query: @@ -129,7 +123,7 @@ class ManticoreBackend(StorageBackend): if tags: # Get a blank search query if not query_created: - search_query = construct_query(None, size, index, blank=True) + search_query = self.construct_query(None, size, index, blank=True) query_created = True for tagname, tagvalue in tags.items(): add_bool.append({tagname: tagvalue}) @@ -171,9 +165,7 @@ class ManticoreBackend(StorageBackend): for source_iter in sources: add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) total_count += 1 - total_sources = len(settings.MAIN_SOURCES) + len( - settings.SOURCES_RESTRICTED - ) + total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED) if not total_count == total_sources: add_top.append(add_top_tmp) @@ -269,8 +261,8 @@ class ManticoreBackend(StorageBackend): search_query["sort"] = sort pprint(search_query) - results = run_query( - client, + results = self.run_query( + self.client, request.user, # passed through run_main_query to filter_blacklisted search_query, ) diff --git a/core/db/opensearch.py b/core/db/opensearch.py index b1c4df5..1d8b797 100644 --- a/core/db/opensearch.py +++ b/core/db/opensearch.py @@ -9,7 +9,7 @@ from core.db import StorageBackend # from json import dumps # pp = lambda x: print(dumps(x, indent=2)) -from core.db.processing import annotate_results, filter_blacklisted, parse_results +from core.db.processing import annotate_results, parse_results from core.views.helpers import dedup_list diff --git a/core/db/processing.py b/core/db/processing.py index 2022897..e0634aa 100644 --- a/core/db/processing.py +++ b/core/db/processing.py @@ -1,7 +1,5 @@ from datetime import datetime -from django.conf import settings - from core.lib.threshold import annotate_num_chans, annotate_num_users, annotate_online