import logging import random import string import time from datetime import datetime from math import floor, log10 from pprint import pprint import manticoresearch import ujson from django.conf import settings from siphashc import siphash from core import r from core.lib.processing import annotate_results, filter_blacklisted, parse_results from core.views import helpers logger = logging.getLogger(__name__) def initialise_manticore(): """ Initialise the Manticore client """ configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.SearchApi(api_client) return (api_client, api_instance) api_client, client = initialise_manticore() def initialise_caching(): hash_key = r.get("cache_hash_key") if not hash_key: letters = string.ascii_lowercase hash_key = "".join(random.choice(letters) for i in range(16)) logger.debug(f"Created new hash key: {hash_key}") r.set("cache_hash_key", hash_key) else: hash_key = hash_key.decode("ascii") logger.debug(f"Decoded hash key: {hash_key}") return hash_key hash_key = initialise_caching() def construct_query(query, size, index, blank=False): """ Accept some query parameters and construct an OpenSearch query. """ if not size: size = 5 query_base = { "index": index, "limit": size, "query": {"bool": {"must": []}}, } query_string = { "query_string": query, } if not blank: query_base["query"]["bool"]["must"].append(query_string) return query_base def run_query(client, user, search_query): if settings.MANTICORE_CACHE: start = time.process_time() query_normalised = ujson.dumps(search_query, sort_keys=True) hash = siphash(hash_key, query_normalised) cache_hit = r.get(f"query_cache.{user.id}.{hash}") if cache_hit: response = ujson.loads(cache_hit) time_took = (time.process_time() - start) * 1000 # Round to 3 significant figures time_took_rounded = round( time_took, 3 - int(floor(log10(abs(time_took)))) - 1 ) response["took"] = time_took_rounded response["cache"] = True return response response = client.search(search_query) print("PRERESP", response) response = response.to_dict() print("RESP", response) if "took" in response: if response["took"] is None: return None filter_blacklisted(user, response) # Write cache if settings.MANTICORE_CACHE: to_write_cache = ujson.dumps(response) r.set(f"query_cache.{user.id}.{hash}", to_write_cache) r.expire(f"query_cache.{user.id}.{hash}", settings.MANTICORE_CACHE_TIMEOUT) return response def query_results( request, query_params, size=None, annotate=True, custom_query=False, reverse=False, dedup=False, dedup_fields=None, tags=None, ): query = None message = None message_class = None add_bool = [] add_top = [] add_top_negative = [] sort = None query_created = False source = None helpers.add_defaults(query_params) # Check size if request.user.is_anonymous: sizes = settings.MANTICORE_MAIN_SIZES_ANON else: sizes = settings.MANTICORE_MAIN_SIZES if not size: if "size" in query_params: size = query_params["size"] if size not in sizes: message = "Size is not permitted" message_class = "danger" return {"message": message, "class": message_class} size = int(size) else: size = 20 # Check index if "index" in query_params: index = query_params["index"] if index == "main": index = settings.MANTICORE_INDEX_MAIN else: if not request.user.has_perm(f"core.index_{index}"): message = "Not permitted to search by this index" message_class = "danger" return { "message": message, "class": message_class, } if index == "meta": index = settings.MANTICORE_INDEX_META elif index == "internal": index = settings.MANTICORE_INDEX_INT else: message = "Index is not valid." message_class = "danger" return { "message": message, "class": message_class, } else: index = settings.MANTICORE_INDEX_MAIN # Create the search query if "query" in query_params: query = query_params["query"] search_query = construct_query(query, size, index) query_created = True else: if custom_query: search_query = custom_query if tags: # Get a blank search query if not query_created: search_query = construct_query(None, size, index, blank=True) query_created = True for tagname, tagvalue in tags.items(): add_bool.append({tagname: tagvalue}) required_any = ["query_full", "query", "tags"] if not any([field in query_params.keys() for field in required_any]): if not custom_query: message = "Empty query!" message_class = "warning" return {"message": message, "class": message_class} # Check for a source if "source" in query_params: source = query_params["source"] if source in settings.MANTICORE_SOURCES_RESTRICTED: if not request.user.has_perm("core.restricted_sources"): message = "Access denied" message_class = "danger" return {"message": message, "class": message_class} elif source not in settings.MANTICORE_MAIN_SOURCES: message = "Invalid source" message_class = "danger" return {"message": message, "class": message_class} if source == "all": source = None # the next block will populate it if source: sources = [source] else: sources = list(settings.MANTICORE_MAIN_SOURCES) if request.user.has_perm("core.restricted_sources"): for source_iter in settings.MANTICORE_SOURCES_RESTRICTED: sources.append(source_iter) add_top_tmp = {"bool": {"should": []}} total_count = 0 for source_iter in sources: add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) total_count += 1 total_sources = len(settings.MANTICORE_MAIN_SOURCES) + len( settings.MANTICORE_SOURCES_RESTRICTED ) if not total_count == total_sources: add_top.append(add_top_tmp) # Date/time range if set({"from_date", "to_date", "from_time", "to_time"}).issubset( query_params.keys() ): from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") from_ts = int(from_ts.timestamp()) to_ts = int(to_ts.timestamp()) range_query = { "range": { "ts": { "gt": from_ts, "lt": to_ts, } } } add_top.append(range_query) # Sorting if "sorting" in query_params: sorting = query_params["sorting"] if sorting not in ("asc", "desc", "none"): message = "Invalid sort" message_class = "danger" return {"message": message, "class": message_class} if sorting in ("asc", "desc"): sort = [ { "ts": { "order": sorting, } } ] # Sentiment handling if "check_sentiment" in query_params: if "sentiment_method" not in query_params: message = "No sentiment method" message_class = "danger" return {"message": message, "class": message_class} if "sentiment" in query_params: sentiment = query_params["sentiment"] try: sentiment = float(sentiment) except ValueError: message = "Sentiment is not a float" message_class = "danger" return {"message": message, "class": message_class} sentiment_method = query_params["sentiment_method"] range_query_compare = {"range": {"sentiment": {}}} range_query_precise = { "match": { "sentiment": None, } } if sentiment_method == "below": range_query_compare["range"]["sentiment"]["lt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "above": range_query_compare["range"]["sentiment"]["gt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "exact": range_query_precise["match"]["sentiment"] = sentiment add_top.append(range_query_precise) elif sentiment_method == "nonzero": range_query_precise["match"]["sentiment"] = 0 add_top_negative.append(range_query_precise) if add_bool: # if "bool" not in search_query["query"]: # search_query["query"]["bool"] = {} # if "must" not in search_query["query"]["bool"]: # search_query["query"]["bool"] = {"must": []} for item in add_bool: search_query["query"]["bool"]["must"].append({"match": item}) if add_top: for item in add_top: search_query["query"]["bool"]["must"].append(item) if add_top_negative: for item in add_top_negative: if "must_not" in search_query["query"]["bool"]: search_query["query"]["bool"]["must_not"].append(item) else: search_query["query"]["bool"]["must_not"] = [item] if sort: search_query["sort"] = sort pprint(search_query) results = run_query( client, request.user, # passed through run_main_query to filter_blacklisted search_query, ) if not results: message = "Error running query" message_class = "danger" return {"message": message, "class": message_class} # results = results.to_dict() results_parsed = parse_results(results) if annotate: annotate_results(results_parsed) if "dedup" in query_params: if query_params["dedup"] == "on": dedup = True else: dedup = False else: dedup = False if reverse: results_parsed = results_parsed[::-1] if dedup: if not dedup_fields: dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] results_parsed = helpers.dedup_list(results_parsed, dedup_fields) context = { "object_list": results_parsed, "card": results["hits"]["total"], "took": results["took"], } if "cache" in results: context["cache"] = results["cache"] return context