From 202a13cccb48d5772c4b47ba6f31ce6939484916 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 27 Sep 2022 15:15:08 +0100 Subject: [PATCH] Begin implementing DB framework --- app/urls.py | 29 +- core/db/__init__.py | 234 ++++++++++++++ core/db/druid.py | 153 ++++++++++ core/db/manticore.py | 311 +++++++++++++++++++ core/db/opensearch.py | 485 +++++++++++++++++++++++++++++ core/{lib => db}/processing.py | 64 +--- core/db/storage.py | 21 ++ core/lib/manage/threshold.py | 30 -- core/lib/manticore.py | 362 ---------------------- core/lib/meta.py | 2 +- core/lib/nicktrace.py | 2 +- core/lib/opensearch.py | 487 ------------------------------ core/templates/base.html | 2 +- core/util/__init__.py | 0 core/util/logs.py | 69 +++++ core/views/ui/drilldown.py | 14 +- core/views/ui/insights.py | 2 +- core/views/ui/tables.py | 8 +- docker/prod/requirements.prod.txt | 3 - docker/requirements.dev.txt | 3 - requirements.txt | 3 - 21 files changed, 1316 insertions(+), 968 deletions(-) create mode 100644 core/db/__init__.py create mode 100644 core/db/druid.py create mode 100644 core/db/manticore.py create mode 100644 core/db/opensearch.py rename core/{lib => db}/processing.py (63%) create mode 100644 core/db/storage.py delete mode 100644 core/lib/manticore.py delete mode 100644 core/lib/opensearch.py create mode 100644 core/util/__init__.py create mode 100644 core/util/logs.py diff --git a/app/urls.py b/app/urls.py index 64cfa3f..bcfec50 100644 --- a/app/urls.py +++ b/app/urls.py @@ -63,14 +63,15 @@ from core.views.ui.drilldown import ( # DrilldownTableView,; Drilldown, DrilldownTableView, ThresholdInfoModal, ) -from core.views.ui.insights import ( - Insights, - InsightsChannels, - InsightsInfoModal, - InsightsMeta, - InsightsNicks, - InsightsSearch, -) + +# from core.views.ui.insights import ( +# Insights, +# InsightsChannels, +# InsightsInfoModal, +# InsightsMeta, +# InsightsNicks, +# InsightsSearch, +# ) urlpatterns = [ path("__debug__/", include("debug_toolbar.urls")), @@ -100,12 +101,12 @@ urlpatterns = [ path("context/", DrilldownContextModal.as_view(), name="modal_context"), path("context_table/", DrilldownContextModal.as_view(), name="modal_context_table"), ## - path("ui/insights/", Insights.as_view(), name="insights"), - path("ui/insights/search/", InsightsSearch.as_view(), name="search_insights"), - path("ui/insights/channels/", InsightsChannels.as_view(), name="chans_insights"), - path("ui/insights/nicks/", InsightsNicks.as_view(), name="nicks_insights"), - path("ui/insights/meta/", InsightsMeta.as_view(), name="meta_insights"), - path("ui/insights/modal/", InsightsInfoModal.as_view(), name="modal_insights"), + # path("ui/insights/", Insights.as_view(), name="insights"), + # path("ui/insights/search/", InsightsSearch.as_view(), name="search_insights"), + # path("ui/insights/channels/", InsightsChannels.as_view(), name="chans_insights"), + # path("ui/insights/nicks/", InsightsNicks.as_view(), name="nicks_insights"), + # path("ui/insights/meta/", InsightsMeta.as_view(), name="meta_insights"), + # path("ui/insights/modal/", InsightsInfoModal.as_view(), name="modal_insights"), ## path( "manage/threshold/irc/overview/", diff --git a/core/db/__init__.py b/core/db/__init__.py new file mode 100644 index 0000000..64b201f --- /dev/null +++ b/core/db/__init__.py @@ -0,0 +1,234 @@ +import random +import string +import time +from math import floor, log10 + +import orjson +from django.conf import settings +from siphashc import siphash + +from core import r +from core.db.processing import annotate_results +from core.util import logs + + +class StorageBackend(object): + def __init__(self, name): + self.log = logs.get_logger(name) + self.log.info(f"Initialising storage backend {name}") + + self.initialise_caching() + self.initialise() + + def initialise(self, **kwargs): + raise NotImplementedError + + def initialise_caching(self): + hash_key = r.get("cache_hash_key") + if not hash_key: + letters = string.ascii_lowercase + hash_key = "".join(random.choice(letters) for i in range(16)) + self.log.debug(f"Created new hash key: {hash_key}") + r.set("cache_hash_key", hash_key) + else: + hash_key = hash_key.decode("ascii") + self.log.debug(f"Decoded hash key: {hash_key}") + self.hash_key = hash_key + + def construct_query(self, **kwargs): + raise NotImplementedError + + def run_query(self, **kwargs): + raise NotImplementedError + + def parse_size(self, query_params, sizes): + if "size" in query_params: + size = query_params["size"] + if size not in sizes: + message = "Size is not permitted" + message_class = "danger" + return {"message": message, "class": message_class} + size = int(size) + else: + size = 15 + + return size + + def parse_index(self, user, query_params): + if "index" in query_params: + index = query_params["index"] + if index == "main": + index = settings.INDEX_MAIN + else: + if not user.has_perm(f"core.index_{index}"): + message = "Not permitted to search by this index" + message_class = "danger" + return { + "message": message, + "class": message_class, + } + if index == "meta": + index = settings.INDEX_META + elif index == "internal": + index = settings.INDEX_INT + else: + message = "Index is not valid." + message_class = "danger" + return { + "message": message, + "class": message_class, + } + else: + index = settings.INDEX_MAIN + return index + + def parse_query(self, query_params, tags, size, index, custom_query, add_bool): + if "query" in query_params: + query = query_params["query"] + search_query = self.construct_query(query, size, index) + query_created = True + else: + if custom_query: + search_query = custom_query + + if tags: + # Get a blank search query + if not query_created: + search_query = self.construct_query(None, size, index, blank=True) + query_created = True + for tagname, tagvalue in tags.items(): + add_bool.append({tagname: tagvalue}) + + required_any = ["query", "tags"] + if not any([field in query_params.keys() for field in required_any]): + if not custom_query: + message = "Empty query!" + message_class = "warning" + return {"message": message, "class": message_class} + + return search_query + + def parse_source(self, user, query_params): + if "source" in query_params: + source = query_params["source"] + + if source in settings.SOURCES_RESTRICTED: + if not user.has_perm("core.restricted_sources"): + message = "Access denied" + message_class = "danger" + return {"message": message, "class": message_class} + elif source not in settings.MAIN_SOURCES: + message = "Invalid source" + message_class = "danger" + return {"message": message, "class": message_class} + + if source == "all": + source = None # the next block will populate it + + if source: + sources = [source] + else: + sources = list(settings.MAIN_SOURCES) + if user.has_perm("core.restricted_sources"): + for source_iter in settings.SOURCES_RESTRICTED: + sources.append(source_iter) + + return sources + + def filter_blacklisted(self, user, response): + """ + Low level filter to take the raw OpenSearch response and remove + objects from it we want to keep secret. + Does not return, the object is mutated in place. + """ + response["redacted"] = 0 + response["exemption"] = None + if user.is_superuser: + response["exemption"] = True + # is_anonymous = isinstance(user, AnonymousUser) + # For every hit from ES + for index, item in enumerate(list(response["hits"]["hits"])): + # For every blacklisted type + for blacklisted_type in settings.OPENSEARCH_BLACKLISTED.keys(): + # Check this field we are matching exists + if "_source" in item.keys(): + data_index = "_source" + elif "fields" in item.keys(): + data_index = "fields" + else: + return False + if blacklisted_type in item[data_index].keys(): + content = item[data_index][blacklisted_type] + # For every item in the blacklisted array for the type + for blacklisted_item in settings.OPENSEARCH_BLACKLISTED[ + blacklisted_type + ]: + if blacklisted_item == str(content): + # Remove the item + if item in response["hits"]["hits"]: + # Let the UI know something was redacted + if ( + "exemption" + not in response["hits"]["hits"][index][data_index] + ): + response["redacted"] += 1 + # Anonymous + if user.is_anonymous: + # Just set it to none so the index is not off + response["hits"]["hits"][index] = None + else: + if not user.has_perm("core.bypass_blacklist"): + response["hits"]["hits"][index] = None + else: + response["hits"]["hits"][index][data_index][ + "exemption" + ] = True + + # Actually get rid of all the things we set to None + response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit] + + def query(self, user, search_query): + # For time tracking + start = time.process_time() + if settings.CACHE: + # Sort the keys so the hash is the same + query_normalised = orjson.dumps(search_query, option=orjson.OPT_SORT_KEYS) + hash = siphash(self.hash_key, query_normalised) + cache_hit = r.get(f"query_cache.{user.id}.{hash}") + if cache_hit: + response = orjson.loads(cache_hit) + response["cache"] = True + return response + response = self.run_query(user, search_query) + if "error" in response and len(response.keys()) == 1: + return response + # response = response.to_dict() + # print("RESP", response) + if "took" in response: + if response["took"] is None: + return None + self.filter_blacklisted(user, response) + + # Write cache + if settings.CACHE: + to_write_cache = orjson.dumps(response) + r.set(f"query_cache.{user.id}.{hash}", to_write_cache) + r.expire(f"query_cache.{user.id}.{hash}", settings.CACHE_TIMEOUT) + + # Parse the response + response_parsed = self.parse(response) + + time_took = (time.process_time() - start) * 1000 + # Round to 3 significant figures + time_took_rounded = round(time_took, 3 - int(floor(log10(abs(time_took)))) - 1) + return {"object_list": response_parsed, "took": time_took_rounded} + + def query_results(self, **kwargs): + raise NotImplementedError + + def process_results(self, **kwargs): + if kwargs.get("annotate"): + annotate_results(kwargs["results"]) + + def parse(self, response): + raise NotImplementedError diff --git a/core/db/druid.py b/core/db/druid.py new file mode 100644 index 0000000..153c984 --- /dev/null +++ b/core/db/druid.py @@ -0,0 +1,153 @@ +import logging +import random +import string +import time +from datetime import datetime +from math import floor, log10 +from pprint import pprint + +import orjson +import requests +from django.conf import settings +from siphashc import siphash + +from core import r +from core.db import StorageBackend +from core.db.processing import parse_druid +from core.views import helpers + +logger = logging.getLogger(__name__) + + +class DruidBackend(StorageBackend): + def __init__(self): + super().__init__("Druid") + + def initialise(self, **kwargs): + # self.client = PyDruid("http://broker:8082", "druid/v2") + pass # we use requests + + def construct_query(self, query, size, index, blank=False): + search_query = { + "limit": size, + "queryType": "scan", + "dataSource": index, + "filter": { + "type": "and", + "fields": [ + + ], + }, + # "resultFormat": "list", + # "columns":[], + "intervals": ["1000-01-01/3000-01-01"], + # "batchSize": 20480, + } + + to_add = { + "type": "search", + "dimension": "msg", + "query": { + "type": "insensitive_contains", + "value": query, + }, + }, + + if blank: + return search_query + else: + search_query["filter"]["fields"].append(to_add) + return search_query + + def parse(self, response): + parsed = parse_druid(response) + print("PARSE LEN", len(parsed)) + return parsed + + def run_query(self, user, search_query): + response = requests.post("http://broker:8082/druid/v2", json=search_query) + response = orjson.loads(response.text) + print("RESPONSE LEN", len(response)) + ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2) + ss = ss.decode() + print(ss) + return response + + def filter_blacklisted(self, user, response): + pass + + def query_results( + self, + request, + query_params, + size=None, + annotate=True, + custom_query=False, + reverse=False, + dedup=False, + dedup_fields=None, + tags=None, + ): + add_bool = [] + add_top = [] + + + helpers.add_defaults(query_params) + + # Check size + if request.user.is_anonymous: + sizes = settings.MAIN_SIZES_ANON + else: + sizes = settings.MAIN_SIZES + if not size: + size = self.parse_size(query_params, sizes) + if isinstance(size, dict): + return size + + # Check index + index = self.parse_index(request.user, query_params) + if isinstance(index, dict): + return index + + # Create the search query + search_query = self.parse_query(query_params, tags, size, index, custom_query, add_bool) + if isinstance(search_query, dict): + return search_query + + sources = self.parse_source(request.user, query_params) + # TODO + add_top_tmp = {"bool": {"should": []}} + total_count = 0 + for source_iter in sources: + add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) + total_count += 1 + total_sources = len(settings.MAIN_SOURCES) + len( + settings.SOURCES_RESTRICTED + ) + if not total_count == total_sources: + add_top.append(add_top_tmp) + + print("SIZE IS", size) + + if add_bool: + self.add_bool(search_query, add_bool) + + response = self.query(request.user, search_query) + # print("RESP", response) + + # ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2) + # ss = ss.decode() + # print(ss) + # print("PARSED", results_parsed) + # return results_parsed + context = response + return context + + def add_bool(self, search_query, add_bool): + if "filter" in search_query: + if "fields" in search_query["filter"]: + search_query["filter"]["fields"].append({"bool": {"should": add_bool}}) + else: + search_query["filter"]["fields"] = [{"bool": {"should": add_bool}}] + else: + search_query["filter"] = {"bool": {"should": add_bool}} diff --git a/core/db/manticore.py b/core/db/manticore.py new file mode 100644 index 0000000..81211b5 --- /dev/null +++ b/core/db/manticore.py @@ -0,0 +1,311 @@ +import logging +import random +import string +import time +from datetime import datetime +from math import floor, log10 +from pprint import pprint + +import orjson +import requests +from django.conf import settings + +from core import r +from core.db import StorageBackend +from core.db.processing import annotate_results, filter_blacklisted, parse_results +from core.views import helpers + +logger = logging.getLogger(__name__) + + +class ManticoreBackend(StorageBackend): + def __init__(self): + super().__init__("Manticore") + + def initialise(self, **kwargs): + """ + Initialise the Manticore client + """ + pass # we use requests + + def construct_query(self, query, size, index, blank=False): + """ + Accept some query parameters and construct an OpenSearch query. + """ + if not size: + size = 5 + query_base = { + "index": index, + "limit": size, + "query": {"bool": {"must": []}}, + } + query_string = { + "query_string": query, + } + if not blank: + query_base["query"]["bool"]["must"].append(query_string) + return query_base + + def run_query(self, client, user, search_query): + response = requests.post( + f"{settings.MANTICORE_URL}/json/search", json=search_query + ) + return response + + def query_results( + self, + request, + query_params, + size=None, + annotate=True, + custom_query=False, + reverse=False, + dedup=False, + dedup_fields=None, + tags=None, + ): + query = None + message = None + message_class = None + add_bool = [] + add_top = [] + add_top_negative = [] + sort = None + query_created = False + source = None + helpers.add_defaults(query_params) + # Check size + if request.user.is_anonymous: + sizes = settings.MANTICORE_MAIN_SIZES_ANON + else: + sizes = settings.MANTICORE_MAIN_SIZES + if not size: + if "size" in query_params: + size = query_params["size"] + if size not in sizes: + message = "Size is not permitted" + message_class = "danger" + return {"message": message, "class": message_class} + size = int(size) + else: + size = 20 + + # Check index + if "index" in query_params: + index = query_params["index"] + if index == "main": + index = settings.MANTICORE_INDEX_MAIN + else: + if not request.user.has_perm(f"core.index_{index}"): + message = "Not permitted to search by this index" + message_class = "danger" + return { + "message": message, + "class": message_class, + } + if index == "meta": + index = settings.MANTICORE_INDEX_META + elif index == "internal": + index = settings.MANTICORE_INDEX_INT + else: + message = "Index is not valid." + message_class = "danger" + return { + "message": message, + "class": message_class, + } + else: + index = settings.MANTICORE_INDEX_MAIN + + # Create the search query + if "query" in query_params: + query = query_params["query"] + search_query = construct_query(query, size, index) + query_created = True + else: + if custom_query: + search_query = custom_query + + if tags: + # Get a blank search query + if not query_created: + search_query = construct_query(None, size, index, blank=True) + query_created = True + for tagname, tagvalue in tags.items(): + add_bool.append({tagname: tagvalue}) + + required_any = ["query_full", "query", "tags"] + if not any([field in query_params.keys() for field in required_any]): + if not custom_query: + message = "Empty query!" + message_class = "warning" + return {"message": message, "class": message_class} + + # Check for a source + if "source" in query_params: + source = query_params["source"] + + if source in settings.SOURCES_RESTRICTED: + if not request.user.has_perm("core.restricted_sources"): + message = "Access denied" + message_class = "danger" + return {"message": message, "class": message_class} + elif source not in settings.MAIN_SOURCES: + message = "Invalid source" + message_class = "danger" + return {"message": message, "class": message_class} + + if source == "all": + source = None # the next block will populate it + + if source: + sources = [source] + else: + sources = list(settings.MAIN_SOURCES) + if request.user.has_perm("core.restricted_sources"): + for source_iter in settings.SOURCES_RESTRICTED: + sources.append(source_iter) + + add_top_tmp = {"bool": {"should": []}} + total_count = 0 + for source_iter in sources: + add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) + total_count += 1 + total_sources = len(settings.MAIN_SOURCES) + len( + settings.SOURCES_RESTRICTED + ) + if not total_count == total_sources: + add_top.append(add_top_tmp) + + # Date/time range + if set({"from_date", "to_date", "from_time", "to_time"}).issubset( + query_params.keys() + ): + from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" + to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" + from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") + to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") + from_ts = int(from_ts.timestamp()) + to_ts = int(to_ts.timestamp()) + range_query = { + "range": { + "ts": { + "gt": from_ts, + "lt": to_ts, + } + } + } + add_top.append(range_query) + + # Sorting + if "sorting" in query_params: + sorting = query_params["sorting"] + if sorting not in ("asc", "desc", "none"): + message = "Invalid sort" + message_class = "danger" + return {"message": message, "class": message_class} + if sorting in ("asc", "desc"): + sort = [ + { + "ts": { + "order": sorting, + } + } + ] + + # Sentiment handling + if "check_sentiment" in query_params: + if "sentiment_method" not in query_params: + message = "No sentiment method" + message_class = "danger" + return {"message": message, "class": message_class} + if "sentiment" in query_params: + sentiment = query_params["sentiment"] + try: + sentiment = float(sentiment) + except ValueError: + message = "Sentiment is not a float" + message_class = "danger" + return {"message": message, "class": message_class} + sentiment_method = query_params["sentiment_method"] + range_query_compare = {"range": {"sentiment": {}}} + range_query_precise = { + "match": { + "sentiment": None, + } + } + if sentiment_method == "below": + range_query_compare["range"]["sentiment"]["lt"] = sentiment + add_top.append(range_query_compare) + elif sentiment_method == "above": + range_query_compare["range"]["sentiment"]["gt"] = sentiment + add_top.append(range_query_compare) + elif sentiment_method == "exact": + range_query_precise["match"]["sentiment"] = sentiment + add_top.append(range_query_precise) + elif sentiment_method == "nonzero": + range_query_precise["match"]["sentiment"] = 0 + add_top_negative.append(range_query_precise) + + if add_bool: + # if "bool" not in search_query["query"]: + # search_query["query"]["bool"] = {} + # if "must" not in search_query["query"]["bool"]: + # search_query["query"]["bool"] = {"must": []} + + for item in add_bool: + search_query["query"]["bool"]["must"].append({"match": item}) + + if add_top: + for item in add_top: + search_query["query"]["bool"]["must"].append(item) + if add_top_negative: + for item in add_top_negative: + if "must_not" in search_query["query"]["bool"]: + search_query["query"]["bool"]["must_not"].append(item) + else: + search_query["query"]["bool"]["must_not"] = [item] + if sort: + search_query["sort"] = sort + + pprint(search_query) + results = run_query( + client, + request.user, # passed through run_main_query to filter_blacklisted + search_query, + ) + if not results: + message = "Error running query" + message_class = "danger" + return {"message": message, "class": message_class} + # results = results.to_dict() + if "error" in results: + message = results["error"] + message_class = "danger" + return {"message": message, "class": message_class} + results_parsed = parse_results(results) + if annotate: + annotate_results(results_parsed) + if "dedup" in query_params: + if query_params["dedup"] == "on": + dedup = True + else: + dedup = False + else: + dedup = False + + if reverse: + results_parsed = results_parsed[::-1] + + if dedup: + if not dedup_fields: + dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] + results_parsed = helpers.dedup_list(results_parsed, dedup_fields) + context = { + "object_list": results_parsed, + "card": results["hits"]["total"], + "took": results["took"], + } + if "cache" in results: + context["cache"] = results["cache"] + return context diff --git a/core/db/opensearch.py b/core/db/opensearch.py new file mode 100644 index 0000000..b1c4df5 --- /dev/null +++ b/core/db/opensearch.py @@ -0,0 +1,485 @@ +# from copy import deepcopy +# from datetime import datetime, timedelta + +from django.conf import settings +from opensearchpy import OpenSearch +from opensearchpy.exceptions import NotFoundError, RequestError + +from core.db import StorageBackend + +# from json import dumps +# pp = lambda x: print(dumps(x, indent=2)) +from core.db.processing import annotate_results, filter_blacklisted, parse_results +from core.views.helpers import dedup_list + + +class OpensearchBackend(StorageBackend): + def __init__(self): + super().__init__("Opensearch") + + def initialise(self, **kwargs): + """ + Inititialise the OpenSearch API endpoint. + """ + auth = (settings.OPENSEARCH_USERNAME, settings.OPENSEARCH_PASSWORD) + client = OpenSearch( + # fmt: off + hosts=[{"host": settings.OPENSEARCH_URL, + "port": settings.OPENSEARCH_PORT}], + http_compress=False, # enables gzip compression for request bodies + http_auth=auth, + # client_cert = client_cert_path, + # client_key = client_key_path, + use_ssl=settings.OPENSEARCH_TLS, + verify_certs=False, + ssl_assert_hostname=False, + ssl_show_warn=False, + # a_certs=ca_certs_path, + ) + self.client = client + + def construct_query(self, query, size, use_query_string=True, tokens=False): + """ + Accept some query parameters and construct an OpenSearch query. + """ + if not size: + size = 5 + query_base = { + "size": size, + "query": {"bool": {"must": []}}, + } + query_string = { + "query_string": { + "query": query, + # "fields": fields, + # "default_field": "msg", + # "type": "best_fields", + "fuzziness": "AUTO", + "fuzzy_transpositions": True, + "fuzzy_max_expansions": 50, + "fuzzy_prefix_length": 0, + # "minimum_should_match": 1, + "default_operator": "or", + "analyzer": "standard", + "lenient": True, + "boost": 1, + "allow_leading_wildcard": True, + # "enable_position_increments": False, + "phrase_slop": 3, + # "max_determinized_states": 10000, + "quote_field_suffix": "", + "quote_analyzer": "standard", + "analyze_wildcard": False, + "auto_generate_synonyms_phrase_query": True, + } + } + query_tokens = { + "simple_query_string": { + # "tokens": query, + "query": query, + "fields": ["tokens"], + "flags": "ALL", + "fuzzy_transpositions": True, + "fuzzy_max_expansions": 50, + "fuzzy_prefix_length": 0, + "default_operator": "and", + "analyzer": "standard", + "lenient": True, + "boost": 1, + "quote_field_suffix": "", + "analyze_wildcard": False, + "auto_generate_synonyms_phrase_query": False, + } + } + if tokens: + query_base["query"]["bool"]["must"].append(query_tokens) + # query["query"]["bool"]["must"].append(query_string) + # query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"] + elif use_query_string: + query_base["query"]["bool"]["must"].append(query_string) + return query_base + + def run_query(self, client, user, query, custom_query=False, index=None, size=None): + """ + Low level helper to run an ES query. + Accept a user to pass it to the filter, so we can + avoid filtering for superusers. + Accept fields and size, for the fields we want to match and the + number of results to return. + """ + if not index: + index = settings.INDEX_MAIN + if custom_query: + search_query = query + else: + search_query = self.construct_query(query, size) + try: + response = client.search(body=search_query, index=index) + except RequestError as err: + print("OpenSearch error", err) + return err + except NotFoundError as err: + print("OpenSearch error", err) + return err + return response + + def query_results( + self, + request, + query_params, + size=None, + annotate=True, + custom_query=False, + reverse=False, + dedup=False, + dedup_fields=None, + lookup_hashes=True, + tags=None, + ): + """ + API helper to alter the OpenSearch return format into something + a bit better to parse. + Accept a HTTP request object. Run the query, and annotate the + results with the other data we have. + """ + # is_anonymous = isinstance(request.user, AnonymousUser) + query = None + message = None + message_class = None + add_bool = [] + add_top = [] + add_top_negative = [] + sort = None + query_created = False + + # Lookup the hash values but don't disclose them to the user + # denied = [] + # if lookup_hashes: + # if settings.HASHING: + # query_params = deepcopy(query_params) + # denied_q = hash_lookup(request.user, query_params) + # denied.extend(denied_q) + # if tags: + # denied_t = hash_lookup(request.user, tags, query_params) + # denied.extend(denied_t) + + # message = "Permission denied: " + # for x in denied: + # if isinstance(x, SearchDenied): + # message += f"Search({x.key}: {x.value}) " + # elif isinstance(x, LookupDenied): + # message += f"Lookup({x.key}: {x.value}) " + # if denied: + # # message = [f"{i}" for i in message] + # # message = "\n".join(message) + # message_class = "danger" + # return {"message": message, "class": message_class} + + if request.user.is_anonymous: + sizes = settings.MAIN_SIZES_ANON + else: + sizes = settings.MAIN_SIZES + if not size: + if "size" in query_params: + size = query_params["size"] + if size not in sizes: + message = "Size is not permitted" + message_class = "danger" + return {"message": message, "class": message_class} + else: + size = 20 + source = None + if "source" in query_params: + source = query_params["source"] + + if source in settings.SOURCES_RESTRICTED: + if not request.user.has_perm("core.restricted_sources"): + message = "Access denied" + message_class = "danger" + return {"message": message, "class": message_class} + elif source not in settings.MAIN_SOURCES: + message = "Invalid source" + message_class = "danger" + return {"message": message, "class": message_class} + + if source == "all": + source = None # the next block will populate it + + if source: + sources = [source] + else: + sources = settings.MAIN_SOURCES + if request.user.has_perm("core.restricted_sources"): + for source_iter in settings.SOURCES_RESTRICTED: + sources.append(source_iter) + + add_top_tmp = {"bool": {"should": []}} + for source_iter in sources: + add_top_tmp["bool"]["should"].append({"match_phrase": {"src": source_iter}}) + add_top.append(add_top_tmp) + + # date_query = False + if set({"from_date", "to_date", "from_time", "to_time"}).issubset( + query_params.keys() + ): + from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" + to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" + range_query = { + "range": { + "ts": { + "gt": from_ts, + "lt": to_ts, + } + } + } + add_top.append(range_query) + + # if date_query: + # if settings.DELAY_RESULTS: + # if source not in settings.SAFE_SOURCES: + # if request.user.has_perm("core.bypass_delay"): + # add_top.append(range_query) + # else: + # delay_as_ts = datetime.now() - timedelta( + # days=settings.DELAY_DURATION + # ) + # lt_as_ts = datetime.strptime( + # range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ" + # ) + # if lt_as_ts > delay_as_ts: + # range_query["range"]["ts"][ + # "lt" + # ] = f"now-{settings.DELAY_DURATION}d" + # add_top.append(range_query) + # else: + # add_top.append(range_query) + # else: + # if settings.DELAY_RESULTS: + # if source not in settings.SAFE_SOURCES: + # if not request.user.has_perm("core.bypass_delay"): + # range_query = { + # "range": { + # "ts": { + # # "gt": , + # "lt": f"now-{settings.DELAY_DURATION}d", + # } + # } + # } + # add_top.append(range_query) + + if "sorting" in query_params: + sorting = query_params["sorting"] + if sorting not in ("asc", "desc", "none"): + message = "Invalid sort" + message_class = "danger" + return {"message": message, "class": message_class} + if sorting in ("asc", "desc"): + sort = [ + { + "ts": { + "order": sorting, + } + } + ] + + if "check_sentiment" in query_params: + if "sentiment_method" not in query_params: + message = "No sentiment method" + message_class = "danger" + return {"message": message, "class": message_class} + if "sentiment" in query_params: + sentiment = query_params["sentiment"] + try: + sentiment = float(sentiment) + except ValueError: + message = "Sentiment is not a float" + message_class = "danger" + return {"message": message, "class": message_class} + sentiment_method = query_params["sentiment_method"] + range_query_compare = {"range": {"sentiment": {}}} + range_query_precise = { + "match": { + "sentiment": None, + } + } + if sentiment_method == "below": + range_query_compare["range"]["sentiment"]["lt"] = sentiment + add_top.append(range_query_compare) + elif sentiment_method == "above": + range_query_compare["range"]["sentiment"]["gt"] = sentiment + add_top.append(range_query_compare) + elif sentiment_method == "exact": + range_query_precise["match"]["sentiment"] = sentiment + add_top.append(range_query_precise) + elif sentiment_method == "nonzero": + range_query_precise["match"]["sentiment"] = 0 + add_top_negative.append(range_query_precise) + + # Only one of query or query_full can be active at once + # We prefer query because it's simpler + if "query" in query_params: + query = query_params["query"] + search_query = self.construct_query(query, size, tokens=True) + query_created = True + elif "query_full" in query_params: + query_full = query_params["query_full"] + # if request.user.has_perm("core.query_search"): + search_query = self.construct_query(query_full, size) + query_created = True + # else: + # message = "You cannot search by query string" + # message_class = "danger" + # return {"message": message, "class": message_class} + else: + if custom_query: + search_query = custom_query + + if tags: + # Get a blank search query + if not query_created: + search_query = self.construct_query(None, size, use_query_string=False) + query_created = True + for tagname, tagvalue in tags.items(): + add_bool.append({tagname: tagvalue}) + + required_any = ["query_full", "query", "tags"] + if not any([field in query_params.keys() for field in required_any]): + if not custom_query: + message = "Empty query!" + message_class = "warning" + return {"message": message, "class": message_class} + + if add_bool: + # if "bool" not in search_query["query"]: + # search_query["query"]["bool"] = {} + # if "must" not in search_query["query"]["bool"]: + # search_query["query"]["bool"] = {"must": []} + + for item in add_bool: + search_query["query"]["bool"]["must"].append({"match_phrase": item}) + if add_top: + for item in add_top: + search_query["query"]["bool"]["must"].append(item) + if add_top_negative: + for item in add_top_negative: + if "must_not" in search_query["query"]["bool"]: + search_query["query"]["bool"]["must_not"].append(item) + else: + search_query["query"]["bool"]["must_not"] = [item] + if sort: + search_query["sort"] = sort + + if "index" in query_params: + index = query_params["index"] + if index == "main": + index = settings.INDEX_MAIN + else: + if not request.user.has_perm(f"core.index_{index}"): + message = "Not permitted to search by this index" + message_class = "danger" + return { + "message": message, + "class": message_class, + } + if index == "meta": + index = settings.INDEX_META + elif index == "internal": + index = settings.INDEX_INT + else: + message = "Index is not valid." + message_class = "danger" + return { + "message": message, + "class": message_class, + } + + else: + index = settings.INDEX_MAIN + + results = self.query( + request.user, # passed through run_main_query to filter_blacklisted + search_query, + custom_query=True, + index=index, + size=size, + ) + if not results: + return False + if isinstance(results, Exception): + message = f"Error: {results.info['error']['root_cause'][0]['type']}" + message_class = "danger" + return {"message": message, "class": message_class} + if len(results["hits"]["hits"]) == 0: + message = "No results." + message_class = "danger" + return {"message": message, "class": message_class} + + results_parsed = parse_results(results) + + if annotate: + annotate_results(results_parsed) + if "dedup" in query_params: + if query_params["dedup"] == "on": + dedup = True + else: + dedup = False + else: + dedup = False + + if reverse: + results_parsed = results_parsed[::-1] + + if dedup: + if not dedup_fields: + dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] + results_parsed = dedup_list(results_parsed, dedup_fields) + + # if source not in settings.SAFE_SOURCES: + # if settings.ENCRYPTION: + # encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY) + + # if settings.HASHING: + # hash_list(request.user, results_parsed) + + # if settings.OBFUSCATION: + # obfuscate_list(request.user, results_parsed) + + # if settings.RANDOMISATION: + # randomise_list(request.user, results_parsed) + + # process_list(results) + + # IMPORTANT! - DO NOT PASS query_params to the user! + context = { + "object_list": results_parsed, + "card": results["hits"]["total"]["value"], + "took": results["took"], + } + if "redacted" in results: + context["redacted"] = results["redacted"] + if "exemption" in results: + context["exemption"] = results["exemption"] + if query: + context["query"] = query + # if settings.DELAY_RESULTS: + # if source not in settings.SAFE_SOURCES: + # if not request.user.has_perm("core.bypass_delay"): + # context["delay"] = settings.DELAY_DURATION + # if settings.RANDOMISATION: + # if source not in settings.SAFE_SOURCES: + # if not request.user.has_perm("core.bypass_randomisation"): + # context["randomised"] = True + return context + + def query_single_result(self, request, query_params): + context = self.query_results(request, query_params, size=100) + + if not context: + return {"message": "Failed to run query", "message_class": "danger"} + if "message" in context: + return context + dedup_set = {item["nick"] for item in context["object_list"]} + if dedup_set: + context["item"] = context["object_list"][0] + + return context diff --git a/core/lib/processing.py b/core/db/processing.py similarity index 63% rename from core/lib/processing.py rename to core/db/processing.py index 243770e..2022897 100644 --- a/core/lib/processing.py +++ b/core/db/processing.py @@ -60,59 +60,6 @@ def annotate_results(results_parsed): item["num_chans"] = num_chans[item["nick"]] -def filter_blacklisted(user, response): - """ - Low level filter to take the raw OpenSearch response and remove - objects from it we want to keep secret. - Does not return, the object is mutated in place. - """ - response["redacted"] = 0 - response["exemption"] = None - if user.is_superuser: - response["exemption"] = True - # is_anonymous = isinstance(user, AnonymousUser) - # For every hit from ES - for index, item in enumerate(list(response["hits"]["hits"])): - # For every blacklisted type - for blacklisted_type in settings.OPENSEARCH_BLACKLISTED.keys(): - # Check this field we are matching exists - if "_source" in item.keys(): - data_index = "_source" - elif "fields" in item.keys(): - data_index = "fields" - else: - return False - if blacklisted_type in item[data_index].keys(): - content = item[data_index][blacklisted_type] - # For every item in the blacklisted array for the type - for blacklisted_item in settings.OPENSEARCH_BLACKLISTED[ - blacklisted_type - ]: - if blacklisted_item == str(content): - # Remove the item - if item in response["hits"]["hits"]: - # Let the UI know something was redacted - if ( - "exemption" - not in response["hits"]["hits"][index][data_index] - ): - response["redacted"] += 1 - # Anonymous - if user.is_anonymous: - # Just set it to none so the index is not off - response["hits"]["hits"][index] = None - else: - if not user.has_perm("core.bypass_blacklist"): - response["hits"]["hits"][index] = None - else: - response["hits"]["hits"][index][data_index][ - "exemption" - ] = True - - # Actually get rid of all the things we set to None - response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit] - - def parse_results(results): results_parsed = [] stringify = ["host", "channel"] @@ -166,3 +113,14 @@ def parse_results(results): element["time"] = time results_parsed.append(element) return results_parsed + + +def parse_druid(response): + results_parsed = [] + for item in response: + if "events" in item: + for event in item["events"]: + results_parsed.append(event) + else: + raise Exception(f"events not in item {item}") + return results_parsed diff --git a/core/db/storage.py b/core/db/storage.py new file mode 100644 index 0000000..06699df --- /dev/null +++ b/core/db/storage.py @@ -0,0 +1,21 @@ +from django.conf import settings + + +def get_db(): + if settings.DB_BACKEND == "DRUID": + from core.db.druid import DruidBackend + + return DruidBackend() + elif settings.DB_BACKEND == "OPENSEARCH": + from core.db.opensearch import OpensearchBackend + + return OpensearchBackend() + elif settings.DB_BACKEND == "MANTICORE": + from core.db.manticore import ManticoreBackend + + return ManticoreBackend() + else: + raise Exception("Invalid DB backend") + + +db = get_db() diff --git a/core/lib/manage/threshold.py b/core/lib/manage/threshold.py index f646817..070c3f9 100644 --- a/core/lib/manage/threshold.py +++ b/core/lib/manage/threshold.py @@ -1,6 +1,5 @@ from django.conf import settings -from core.lib.opensearch import client, run_main_query from core.lib.threshold import threshold_request @@ -162,35 +161,6 @@ def construct_alert_query(): return query -def get_irc_alerts(user): - query = construct_alert_query() - results = run_main_query( - client, - user, # passed through run_main_query to filter_blacklisted - query, - custom_query=True, - index=settings.OPENSEARCH_INDEX_INT, - ) - if not results: - return [] - results_parsed = [] - if "hits" in results.keys(): - if "hits" in results["hits"]: - for item in results["hits"]["hits"]: - element = item["_source"] - element["id"] = item["_id"] - - # Split the timestamp into date and time - ts = element["ts"] - ts_spl = ts.split("T") - date = ts_spl[0] - time = ts_spl[1] - element["date"] = date - element["time"] = time - results_parsed.append(element) - return results_parsed - - def send_irc_message(net, num, channel, msg, nick=None): url = f"irc/msg/{net}/{num}" payload = {"msg": msg, "channel": channel} diff --git a/core/lib/manticore.py b/core/lib/manticore.py deleted file mode 100644 index 6563173..0000000 --- a/core/lib/manticore.py +++ /dev/null @@ -1,362 +0,0 @@ -import logging -import random -import string -import time -from datetime import datetime -from math import floor, log10 -from pprint import pprint - -import manticoresearch -import requests -import ujson -from django.conf import settings -from siphashc import siphash - -from core import r -from core.lib.processing import annotate_results, filter_blacklisted, parse_results -from core.views import helpers - -logger = logging.getLogger(__name__) - - -def initialise_manticore(): - """ - Initialise the Manticore client - """ - configuration = manticoresearch.Configuration(host=settings.MANTICORE_URL) - api_client = manticoresearch.ApiClient(configuration) - api_instance = manticoresearch.SearchApi(api_client) - - return (api_client, api_instance) - - -api_client, client = initialise_manticore() - - -def initialise_caching(): - hash_key = r.get("cache_hash_key") - if not hash_key: - letters = string.ascii_lowercase - hash_key = "".join(random.choice(letters) for i in range(16)) - logger.debug(f"Created new hash key: {hash_key}") - r.set("cache_hash_key", hash_key) - else: - hash_key = hash_key.decode("ascii") - logger.debug(f"Decoded hash key: {hash_key}") - return hash_key - - -hash_key = initialise_caching() - - -def construct_query(query, size, index, blank=False): - """ - Accept some query parameters and construct an OpenSearch query. - """ - if not size: - size = 5 - query_base = { - "index": index, - "limit": size, - "query": {"bool": {"must": []}}, - } - query_string = { - "query_string": query, - } - if not blank: - query_base["query"]["bool"]["must"].append(query_string) - return query_base - - -def run_query(client, user, search_query): - if settings.MANTICORE_CACHE: - start = time.process_time() - query_normalised = ujson.dumps(search_query, sort_keys=True) - hash = siphash(hash_key, query_normalised) - cache_hit = r.get(f"query_cache.{user.id}.{hash}") - if cache_hit: - response = ujson.loads(cache_hit) - time_took = (time.process_time() - start) * 1000 - # Round to 3 significant figures - time_took_rounded = round( - time_took, 3 - int(floor(log10(abs(time_took)))) - 1 - ) - response["took"] = time_took_rounded - response["cache"] = True - return response - # response = client.search(search_query) - response = requests.post(f"{settings.MANTICORE_URL}/json/search", json=search_query) - response = ujson.loads(response.text) - if "error" in response and len(response.keys()) == 1: - return response - # response = response.to_dict() - #print("RESP", response) - if "took" in response: - if response["took"] is None: - return None - filter_blacklisted(user, response) - - # Write cache - if settings.MANTICORE_CACHE: - to_write_cache = ujson.dumps(response) - r.set(f"query_cache.{user.id}.{hash}", to_write_cache) - r.expire(f"query_cache.{user.id}.{hash}", settings.MANTICORE_CACHE_TIMEOUT) - return response - - -def query_results( - request, - query_params, - size=None, - annotate=True, - custom_query=False, - reverse=False, - dedup=False, - dedup_fields=None, - tags=None, -): - query = None - message = None - message_class = None - add_bool = [] - add_top = [] - add_top_negative = [] - sort = None - query_created = False - source = None - helpers.add_defaults(query_params) - # Check size - if request.user.is_anonymous: - sizes = settings.MANTICORE_MAIN_SIZES_ANON - else: - sizes = settings.MANTICORE_MAIN_SIZES - if not size: - if "size" in query_params: - size = query_params["size"] - if size not in sizes: - message = "Size is not permitted" - message_class = "danger" - return {"message": message, "class": message_class} - size = int(size) - else: - size = 20 - - # Check index - if "index" in query_params: - index = query_params["index"] - if index == "main": - index = settings.MANTICORE_INDEX_MAIN - else: - if not request.user.has_perm(f"core.index_{index}"): - message = "Not permitted to search by this index" - message_class = "danger" - return { - "message": message, - "class": message_class, - } - if index == "meta": - index = settings.MANTICORE_INDEX_META - elif index == "internal": - index = settings.MANTICORE_INDEX_INT - else: - message = "Index is not valid." - message_class = "danger" - return { - "message": message, - "class": message_class, - } - else: - index = settings.MANTICORE_INDEX_MAIN - - # Create the search query - if "query" in query_params: - query = query_params["query"] - search_query = construct_query(query, size, index) - query_created = True - else: - if custom_query: - search_query = custom_query - - if tags: - # Get a blank search query - if not query_created: - search_query = construct_query(None, size, index, blank=True) - query_created = True - for tagname, tagvalue in tags.items(): - add_bool.append({tagname: tagvalue}) - - required_any = ["query_full", "query", "tags"] - if not any([field in query_params.keys() for field in required_any]): - if not custom_query: - message = "Empty query!" - message_class = "warning" - return {"message": message, "class": message_class} - - # Check for a source - if "source" in query_params: - source = query_params["source"] - - if source in settings.MANTICORE_SOURCES_RESTRICTED: - if not request.user.has_perm("core.restricted_sources"): - message = "Access denied" - message_class = "danger" - return {"message": message, "class": message_class} - elif source not in settings.MANTICORE_MAIN_SOURCES: - message = "Invalid source" - message_class = "danger" - return {"message": message, "class": message_class} - - if source == "all": - source = None # the next block will populate it - - if source: - sources = [source] - else: - sources = list(settings.MANTICORE_MAIN_SOURCES) - if request.user.has_perm("core.restricted_sources"): - for source_iter in settings.MANTICORE_SOURCES_RESTRICTED: - sources.append(source_iter) - - add_top_tmp = {"bool": {"should": []}} - total_count = 0 - for source_iter in sources: - add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) - total_count += 1 - total_sources = len(settings.MANTICORE_MAIN_SOURCES) + len( - settings.MANTICORE_SOURCES_RESTRICTED - ) - if not total_count == total_sources: - add_top.append(add_top_tmp) - - # Date/time range - if set({"from_date", "to_date", "from_time", "to_time"}).issubset( - query_params.keys() - ): - from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" - to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" - from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") - to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") - from_ts = int(from_ts.timestamp()) - to_ts = int(to_ts.timestamp()) - range_query = { - "range": { - "ts": { - "gt": from_ts, - "lt": to_ts, - } - } - } - add_top.append(range_query) - - # Sorting - if "sorting" in query_params: - sorting = query_params["sorting"] - if sorting not in ("asc", "desc", "none"): - message = "Invalid sort" - message_class = "danger" - return {"message": message, "class": message_class} - if sorting in ("asc", "desc"): - sort = [ - { - "ts": { - "order": sorting, - } - } - ] - - # Sentiment handling - if "check_sentiment" in query_params: - if "sentiment_method" not in query_params: - message = "No sentiment method" - message_class = "danger" - return {"message": message, "class": message_class} - if "sentiment" in query_params: - sentiment = query_params["sentiment"] - try: - sentiment = float(sentiment) - except ValueError: - message = "Sentiment is not a float" - message_class = "danger" - return {"message": message, "class": message_class} - sentiment_method = query_params["sentiment_method"] - range_query_compare = {"range": {"sentiment": {}}} - range_query_precise = { - "match": { - "sentiment": None, - } - } - if sentiment_method == "below": - range_query_compare["range"]["sentiment"]["lt"] = sentiment - add_top.append(range_query_compare) - elif sentiment_method == "above": - range_query_compare["range"]["sentiment"]["gt"] = sentiment - add_top.append(range_query_compare) - elif sentiment_method == "exact": - range_query_precise["match"]["sentiment"] = sentiment - add_top.append(range_query_precise) - elif sentiment_method == "nonzero": - range_query_precise["match"]["sentiment"] = 0 - add_top_negative.append(range_query_precise) - - if add_bool: - # if "bool" not in search_query["query"]: - # search_query["query"]["bool"] = {} - # if "must" not in search_query["query"]["bool"]: - # search_query["query"]["bool"] = {"must": []} - - for item in add_bool: - search_query["query"]["bool"]["must"].append({"match": item}) - - if add_top: - for item in add_top: - search_query["query"]["bool"]["must"].append(item) - if add_top_negative: - for item in add_top_negative: - if "must_not" in search_query["query"]["bool"]: - search_query["query"]["bool"]["must_not"].append(item) - else: - search_query["query"]["bool"]["must_not"] = [item] - if sort: - search_query["sort"] = sort - - pprint(search_query) - results = run_query( - client, - request.user, # passed through run_main_query to filter_blacklisted - search_query, - ) - if not results: - message = "Error running query" - message_class = "danger" - return {"message": message, "class": message_class} - # results = results.to_dict() - if "error" in results: - message = results["error"] - message_class = "danger" - return {"message": message, "class": message_class} - results_parsed = parse_results(results) - if annotate: - annotate_results(results_parsed) - if "dedup" in query_params: - if query_params["dedup"] == "on": - dedup = True - else: - dedup = False - else: - dedup = False - - if reverse: - results_parsed = results_parsed[::-1] - - if dedup: - if not dedup_fields: - dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] - results_parsed = helpers.dedup_list(results_parsed, dedup_fields) - context = { - "object_list": results_parsed, - "card": results["hits"]["total"], - "took": results["took"], - } - if "cache" in results: - context["cache"] = results["cache"] - return context diff --git a/core/lib/meta.py b/core/lib/meta.py index 5a368ea..91a367b 100644 --- a/core/lib/meta.py +++ b/core/lib/meta.py @@ -3,7 +3,7 @@ from math import ceil from django.conf import settings from numpy import array_split -from core.lib.opensearch import client, run_main_query +from core.db.opensearch import client, run_main_query def construct_query(net, nicks): diff --git a/core/lib/nicktrace.py b/core/lib/nicktrace.py index e7022ae..41e4b2d 100644 --- a/core/lib/nicktrace.py +++ b/core/lib/nicktrace.py @@ -3,7 +3,7 @@ from math import ceil from django.conf import settings from numpy import array_split -from core.lib.opensearch import client, run_main_query +from core.lib.druid import client, run_main_query def construct_query(net, nicks): diff --git a/core/lib/opensearch.py b/core/lib/opensearch.py deleted file mode 100644 index ec8fd44..0000000 --- a/core/lib/opensearch.py +++ /dev/null @@ -1,487 +0,0 @@ -# from copy import deepcopy -# from datetime import datetime, timedelta - -from django.conf import settings -from opensearchpy import OpenSearch -from opensearchpy.exceptions import NotFoundError, RequestError - -# from json import dumps -# pp = lambda x: print(dumps(x, indent=2)) -from core.lib.processing import annotate_results, filter_blacklisted, parse_results -from core.views.helpers import dedup_list - - -def initialise_opensearch(): - """ - Inititialise the OpenSearch API endpoint. - """ - auth = (settings.OPENSEARCH_USERNAME, settings.OPENSEARCH_PASSWORD) - client = OpenSearch( - # fmt: off - hosts=[{"host": settings.OPENSEARCH_URL, - "port": settings.OPENSEARCH_PORT}], - http_compress=False, # enables gzip compression for request bodies - http_auth=auth, - # client_cert = client_cert_path, - # client_key = client_key_path, - use_ssl=settings.OPENSEARCH_TLS, - verify_certs=False, - ssl_assert_hostname=False, - ssl_show_warn=False, - # a_certs=ca_certs_path, - ) - return client - - -client = initialise_opensearch() - - -def construct_query(query, size, use_query_string=True, tokens=False): - """ - Accept some query parameters and construct an OpenSearch query. - """ - if not size: - size = 5 - query_base = { - "size": size, - "query": {"bool": {"must": []}}, - } - query_string = { - "query_string": { - "query": query, - # "fields": fields, - # "default_field": "msg", - # "type": "best_fields", - "fuzziness": "AUTO", - "fuzzy_transpositions": True, - "fuzzy_max_expansions": 50, - "fuzzy_prefix_length": 0, - # "minimum_should_match": 1, - "default_operator": "or", - "analyzer": "standard", - "lenient": True, - "boost": 1, - "allow_leading_wildcard": True, - # "enable_position_increments": False, - "phrase_slop": 3, - # "max_determinized_states": 10000, - "quote_field_suffix": "", - "quote_analyzer": "standard", - "analyze_wildcard": False, - "auto_generate_synonyms_phrase_query": True, - } - } - query_tokens = { - "simple_query_string": { - # "tokens": query, - "query": query, - "fields": ["tokens"], - "flags": "ALL", - "fuzzy_transpositions": True, - "fuzzy_max_expansions": 50, - "fuzzy_prefix_length": 0, - "default_operator": "and", - "analyzer": "standard", - "lenient": True, - "boost": 1, - "quote_field_suffix": "", - "analyze_wildcard": False, - "auto_generate_synonyms_phrase_query": False, - } - } - if tokens: - query_base["query"]["bool"]["must"].append(query_tokens) - # query["query"]["bool"]["must"].append(query_string) - # query["query"]["bool"]["must"][0]["query_string"]["fields"] = ["tokens"] - elif use_query_string: - query_base["query"]["bool"]["must"].append(query_string) - return query_base - - -def run_main_query(client, user, query, custom_query=False, index=None, size=None): - """ - Low level helper to run an ES query. - Accept a user to pass it to the filter, so we can - avoid filtering for superusers. - Accept fields and size, for the fields we want to match and the - number of results to return. - """ - if not index: - index = settings.OPENSEARCH_INDEX_MAIN - if custom_query: - search_query = query - else: - search_query = construct_query(query, size) - try: - response = client.search(body=search_query, index=index) - except RequestError as err: - print("OpenSearch error", err) - return err - except NotFoundError as err: - print("OpenSearch error", err) - return err - filter_blacklisted(user, response) - return response - - -def query_results( - request, - query_params, - size=None, - annotate=True, - custom_query=False, - reverse=False, - dedup=False, - dedup_fields=None, - lookup_hashes=True, - tags=None, -): - """ - API helper to alter the OpenSearch return format into something - a bit better to parse. - Accept a HTTP request object. Run the query, and annotate the - results with the other data we have. - """ - # is_anonymous = isinstance(request.user, AnonymousUser) - query = None - message = None - message_class = None - add_bool = [] - add_top = [] - add_top_negative = [] - sort = None - query_created = False - - # Lookup the hash values but don't disclose them to the user - # denied = [] - # if lookup_hashes: - # if settings.HASHING: - # query_params = deepcopy(query_params) - # denied_q = hash_lookup(request.user, query_params) - # denied.extend(denied_q) - # if tags: - # denied_t = hash_lookup(request.user, tags, query_params) - # denied.extend(denied_t) - - # message = "Permission denied: " - # for x in denied: - # if isinstance(x, SearchDenied): - # message += f"Search({x.key}: {x.value}) " - # elif isinstance(x, LookupDenied): - # message += f"Lookup({x.key}: {x.value}) " - # if denied: - # # message = [f"{i}" for i in message] - # # message = "\n".join(message) - # message_class = "danger" - # return {"message": message, "class": message_class} - - if request.user.is_anonymous: - sizes = settings.OPENSEARCH_MAIN_SIZES_ANON - else: - sizes = settings.OPENSEARCH_MAIN_SIZES - if not size: - if "size" in query_params: - size = query_params["size"] - if size not in sizes: - message = "Size is not permitted" - message_class = "danger" - return {"message": message, "class": message_class} - else: - size = 20 - source = None - if "source" in query_params: - source = query_params["source"] - - if source in settings.OPENSEARCH_SOURCES_RESTRICTED: - if not request.user.has_perm("core.restricted_sources"): - message = "Access denied" - message_class = "danger" - return {"message": message, "class": message_class} - elif source not in settings.OPENSEARCH_MAIN_SOURCES: - message = "Invalid source" - message_class = "danger" - return {"message": message, "class": message_class} - - if source == "all": - source = None # the next block will populate it - - if source: - sources = [source] - else: - sources = settings.OPENSEARCH_MAIN_SOURCES - if request.user.has_perm("core.restricted_sources"): - for source_iter in settings.OPENSEARCH_SOURCES_RESTRICTED: - sources.append(source_iter) - - add_top_tmp = {"bool": {"should": []}} - for source_iter in sources: - add_top_tmp["bool"]["should"].append({"match_phrase": {"src": source_iter}}) - add_top.append(add_top_tmp) - - # date_query = False - if set({"from_date", "to_date", "from_time", "to_time"}).issubset( - query_params.keys() - ): - from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" - to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" - range_query = { - "range": { - "ts": { - "gt": from_ts, - "lt": to_ts, - } - } - } - add_top.append(range_query) - - # if date_query: - # if settings.DELAY_RESULTS: - # if source not in settings.SAFE_SOURCES: - # if request.user.has_perm("core.bypass_delay"): - # add_top.append(range_query) - # else: - # delay_as_ts = datetime.now() - timedelta( - # days=settings.DELAY_DURATION - # ) - # lt_as_ts = datetime.strptime( - # range_query["range"]["ts"]["lt"], "%Y-%m-%dT%H:%MZ" - # ) - # if lt_as_ts > delay_as_ts: - # range_query["range"]["ts"][ - # "lt" - # ] = f"now-{settings.DELAY_DURATION}d" - # add_top.append(range_query) - # else: - # add_top.append(range_query) - # else: - # if settings.DELAY_RESULTS: - # if source not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_delay"): - # range_query = { - # "range": { - # "ts": { - # # "gt": , - # "lt": f"now-{settings.DELAY_DURATION}d", - # } - # } - # } - # add_top.append(range_query) - - if "sorting" in query_params: - sorting = query_params["sorting"] - if sorting not in ("asc", "desc", "none"): - message = "Invalid sort" - message_class = "danger" - return {"message": message, "class": message_class} - if sorting in ("asc", "desc"): - sort = [ - { - "ts": { - "order": sorting, - } - } - ] - - if "check_sentiment" in query_params: - if "sentiment_method" not in query_params: - message = "No sentiment method" - message_class = "danger" - return {"message": message, "class": message_class} - if "sentiment" in query_params: - sentiment = query_params["sentiment"] - try: - sentiment = float(sentiment) - except ValueError: - message = "Sentiment is not a float" - message_class = "danger" - return {"message": message, "class": message_class} - sentiment_method = query_params["sentiment_method"] - range_query_compare = {"range": {"sentiment": {}}} - range_query_precise = { - "match": { - "sentiment": None, - } - } - if sentiment_method == "below": - range_query_compare["range"]["sentiment"]["lt"] = sentiment - add_top.append(range_query_compare) - elif sentiment_method == "above": - range_query_compare["range"]["sentiment"]["gt"] = sentiment - add_top.append(range_query_compare) - elif sentiment_method == "exact": - range_query_precise["match"]["sentiment"] = sentiment - add_top.append(range_query_precise) - elif sentiment_method == "nonzero": - range_query_precise["match"]["sentiment"] = 0 - add_top_negative.append(range_query_precise) - - # Only one of query or query_full can be active at once - # We prefer query because it's simpler - if "query" in query_params: - query = query_params["query"] - search_query = construct_query(query, size, tokens=True) - query_created = True - elif "query_full" in query_params: - query_full = query_params["query_full"] - # if request.user.has_perm("core.query_search"): - search_query = construct_query(query_full, size) - query_created = True - # else: - # message = "You cannot search by query string" - # message_class = "danger" - # return {"message": message, "class": message_class} - else: - if custom_query: - search_query = custom_query - - if tags: - # Get a blank search query - if not query_created: - search_query = construct_query(None, size, use_query_string=False) - query_created = True - for tagname, tagvalue in tags.items(): - add_bool.append({tagname: tagvalue}) - - required_any = ["query_full", "query", "tags"] - if not any([field in query_params.keys() for field in required_any]): - if not custom_query: - message = "Empty query!" - message_class = "warning" - return {"message": message, "class": message_class} - - if add_bool: - # if "bool" not in search_query["query"]: - # search_query["query"]["bool"] = {} - # if "must" not in search_query["query"]["bool"]: - # search_query["query"]["bool"] = {"must": []} - - for item in add_bool: - search_query["query"]["bool"]["must"].append({"match_phrase": item}) - if add_top: - for item in add_top: - search_query["query"]["bool"]["must"].append(item) - if add_top_negative: - for item in add_top_negative: - if "must_not" in search_query["query"]["bool"]: - search_query["query"]["bool"]["must_not"].append(item) - else: - search_query["query"]["bool"]["must_not"] = [item] - if sort: - search_query["sort"] = sort - - if "index" in query_params: - index = query_params["index"] - if index == "main": - index = settings.OPENSEARCH_INDEX_MAIN - else: - if not request.user.has_perm(f"core.index_{index}"): - message = "Not permitted to search by this index" - message_class = "danger" - return { - "message": message, - "class": message_class, - } - if index == "meta": - index = settings.OPENSEARCH_INDEX_META - elif index == "internal": - index = settings.OPENSEARCH_INDEX_INT - else: - message = "Index is not valid." - message_class = "danger" - return { - "message": message, - "class": message_class, - } - - else: - index = settings.OPENSEARCH_INDEX_MAIN - - results = run_main_query( - client, - request.user, # passed through run_main_query to filter_blacklisted - search_query, - custom_query=True, - index=index, - size=size, - ) - if not results: - return False - if isinstance(results, Exception): - message = f"Error: {results.info['error']['root_cause'][0]['type']}" - message_class = "danger" - return {"message": message, "class": message_class} - if len(results["hits"]["hits"]) == 0: - message = "No results." - message_class = "danger" - return {"message": message, "class": message_class} - - results_parsed = parse_results(results) - - if annotate: - annotate_results(results_parsed) - if "dedup" in query_params: - if query_params["dedup"] == "on": - dedup = True - else: - dedup = False - else: - dedup = False - - if reverse: - results_parsed = results_parsed[::-1] - - if dedup: - if not dedup_fields: - dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] - results_parsed = dedup_list(results_parsed, dedup_fields) - - # if source not in settings.SAFE_SOURCES: - # if settings.ENCRYPTION: - # encrypt_list(request.user, results_parsed, settings.ENCRYPTION_KEY) - - # if settings.HASHING: - # hash_list(request.user, results_parsed) - - # if settings.OBFUSCATION: - # obfuscate_list(request.user, results_parsed) - - # if settings.RANDOMISATION: - # randomise_list(request.user, results_parsed) - - # process_list(results) - - # IMPORTANT! - DO NOT PASS query_params to the user! - context = { - "object_list": results_parsed, - "card": results["hits"]["total"]["value"], - "took": results["took"], - } - if "redacted" in results: - context["redacted"] = results["redacted"] - if "exemption" in results: - context["exemption"] = results["exemption"] - if query: - context["query"] = query - # if settings.DELAY_RESULTS: - # if source not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_delay"): - # context["delay"] = settings.DELAY_DURATION - # if settings.RANDOMISATION: - # if source not in settings.SAFE_SOURCES: - # if not request.user.has_perm("core.bypass_randomisation"): - # context["randomised"] = True - return context - - -def query_single_result(request, query_params): - context = query_results(request, query_params, size=100) - - if not context: - return {"message": "Failed to run query", "message_class": "danger"} - if "message" in context: - return context - dedup_set = {item["nick"] for item in context["object_list"]} - if dedup_set: - context["item"] = context["object_list"][0] - - return context diff --git a/core/templates/base.html b/core/templates/base.html index 2f6fc60..a2df37f 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -257,7 +257,7 @@ {% endif %} {% if perms.core.use_insights %} - + Insights {% endif %} diff --git a/core/util/__init__.py b/core/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/util/logs.py b/core/util/logs.py new file mode 100644 index 0000000..045c95f --- /dev/null +++ b/core/util/logs.py @@ -0,0 +1,69 @@ +# Other library imports +import logging + +log = logging.getLogger("util") + +debug = True + +# Color definitions +BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) +COLORS = { + "WARNING": YELLOW, + "INFO": WHITE, + "DEBUG": BLUE, + "CRITICAL": YELLOW, + "ERROR": RED, +} +RESET_SEQ = "\033[0m" +COLOR_SEQ = "\033[1;%dm" +BOLD_SEQ = "\033[1m" + + +def formatter_message(message, use_color=True): + if use_color: + message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) + else: + message = message.replace("$RESET", "").replace("$BOLD", "") + return message + + +class ColoredFormatter(logging.Formatter): + def __init__(self, msg, use_color=True): + logging.Formatter.__init__(self, msg) + self.use_color = use_color + + def format(self, record): + levelname = record.levelname + if self.use_color and levelname in COLORS: + levelname_color = ( + COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ + ) + record.levelname = levelname_color + return logging.Formatter.format(self, record) + + +def get_logger(name): + + # Define the logging format + FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s" + COLOR_FORMAT = formatter_message(FORMAT, True) + color_formatter = ColoredFormatter(COLOR_FORMAT) + # formatter = logging.Formatter( + + # Why is this so complicated? + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + # ch.setFormatter(formatter) + ch.setFormatter(color_formatter) + + # Define the logger on the base class + log = logging.getLogger(name) + log.setLevel(logging.INFO) + if debug: + log.setLevel(logging.DEBUG) + ch.setLevel(logging.DEBUG) + + # Add the handler and stop it being silly and printing everything twice + log.addHandler(ch) + log.propagate = False + return log diff --git a/core/views/ui/drilldown.py b/core/views/ui/drilldown.py index ba4162a..24153f1 100644 --- a/core/views/ui/drilldown.py +++ b/core/views/ui/drilldown.py @@ -11,10 +11,8 @@ from django_tables2 import SingleTableView from rest_framework.parsers import FormParser from rest_framework.views import APIView +from core.db.storage import db from core.lib.context import construct_query - -# from core.lib.opensearch import query_results -from core.lib.manticore import query_results from core.lib.threshold import ( annotate_num_chans, annotate_num_users, @@ -87,7 +85,7 @@ def make_graph(results): date = str(index) graph.append( { - "text": item.get("tokens", None) + "text": item.get("words_noun", None) or item.get("msg", None) or item.get("id"), "nick": item.get("nick", None), @@ -108,9 +106,9 @@ def drilldown_search(request, return_context=False, template=None): else: template_name = template if request.user.is_anonymous: - sizes = settings.MANTICORE_MAIN_SIZES_ANON + sizes = settings.MAIN_SIZES_ANON else: - sizes = settings.MANTICORE_MAIN_SIZES + sizes = settings.MAIN_SIZES if request.GET: if not request.htmx: @@ -165,7 +163,7 @@ def drilldown_search(request, return_context=False, template=None): tags = parse_tags(query_params["tags"]) extra_params["tags"] = tags - context = query_results(request, query_params, **extra_params) + context = db.query_results(request, query_params, **extra_params) context["unique"] = "results" # Valid sizes @@ -375,7 +373,7 @@ class DrilldownContextModal(APIView): type=type, nicks=nicks_sensitive, ) - results = query_results( + results = db.query_results( request, query_params, size=size, diff --git a/core/views/ui/insights.py b/core/views/ui/insights.py index 42d373f..b1d22a2 100644 --- a/core/views/ui/insights.py +++ b/core/views/ui/insights.py @@ -7,9 +7,9 @@ from django.views import View from rest_framework.parsers import FormParser from rest_framework.views import APIView +from core.db.druid import query_single_result from core.lib.meta import get_meta from core.lib.nicktrace import get_nicks -from core.lib.opensearch import query_single_result from core.lib.threshold import ( annotate_num_chans, annotate_num_users, diff --git a/core/views/ui/tables.py b/core/views/ui/tables.py index 8398fe2..ebc6eae 100644 --- a/core/views/ui/tables.py +++ b/core/views/ui/tables.py @@ -65,7 +65,13 @@ class DrilldownTable(Table): realname = Column() server = Column() mtype = Column() - tokens = Column() + # tokens = Column() + lang_code = Column() + lang_name = Column() + words_noun = Column() + words_adj = Column() + words_verb = Column() + words_adv = Column() hidden = Column() filename = Column() file_md5 = Column() diff --git a/docker/prod/requirements.prod.txt b/docker/prod/requirements.prod.txt index f752602..6cac89f 100644 --- a/docker/prod/requirements.prod.txt +++ b/docker/prod/requirements.prod.txt @@ -14,9 +14,6 @@ cryptography siphashc redis sortedcontainers -#manticoresearch django-debug-toolbar django-debug-toolbar-template-profiler -ujson orjson -pydruid diff --git a/docker/requirements.dev.txt b/docker/requirements.dev.txt index ff393e8..f940731 100644 --- a/docker/requirements.dev.txt +++ b/docker/requirements.dev.txt @@ -13,9 +13,6 @@ cryptography siphashc redis sortedcontainers -#manticoresearch django-debug-toolbar django-debug-toolbar-template-profiler -ujson orjson -pydruid diff --git a/requirements.txt b/requirements.txt index 12133d4..1600408 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,9 +14,6 @@ cryptography siphashc redis sortedcontainers -#manticoresearch django-debug-toolbar django-debug-toolbar-template-profiler -ujson orjson -pydruid \ No newline at end of file