import logging from datetime import datetime from pprint import pprint import requests from django.conf import settings from core.db import StorageBackend, add_defaults, dedup_list from core.db.processing import annotate_results, parse_results logger = logging.getLogger(__name__) class ManticoreBackend(StorageBackend): def __init__(self): super().__init__("manticore") def initialise(self, **kwargs): """ Initialise the Manticore client """ pass # we use requests def construct_query(self, query, size, index, blank=False): """ Accept some query parameters and construct an OpenSearch query. """ if not size: size = 5 query_base = { "index": index, "limit": size, "query": {"bool": {"must": []}}, } query_string = { "query_string": query, } if not blank: query_base["query"]["bool"]["must"].append(query_string) return query_base def run_query(self, client, user, search_query): response = requests.post( f"{settings.MANTICORE_URL}/json/search", json=search_query ) return response def query_results( self, request, query_params, size=None, annotate=True, custom_query=False, reverse=False, dedup=False, dedup_fields=None, tags=None, ): query = None message = None message_class = None add_bool = [] add_top = [] add_top_negative = [] sort = None query_created = False source = None add_defaults(query_params) # Check size if request.user.is_anonymous: sizes = settings.MANTICORE_MAIN_SIZES_ANON else: sizes = settings.MANTICORE_MAIN_SIZES if not size: if "size" in query_params: size = query_params["size"] if size not in sizes: message = "Size is not permitted" message_class = "danger" return {"message": message, "class": message_class} size = int(size) else: size = 20 # Check index if "index" in query_params: index = query_params["index"] if index == "main": index = settings.MANTICORE_INDEX_MAIN else: if not request.user.has_perm(f"core.index_{index}"): message = "Not permitted to search by this index" message_class = "danger" return { "message": message, "class": message_class, } if index == "meta": index = settings.MANTICORE_INDEX_META elif index == "internal": index = settings.MANTICORE_INDEX_INT else: message = "Index is not valid." message_class = "danger" return { "message": message, "class": message_class, } else: index = settings.MANTICORE_INDEX_MAIN # Create the search query if "query" in query_params: query = query_params["query"] search_query = self.construct_query(query, size, index) query_created = True else: if custom_query: search_query = custom_query if tags: # Get a blank search query if not query_created: search_query = self.construct_query(None, size, index, blank=True) query_created = True for tagname, tagvalue in tags.items(): add_bool.append({tagname: tagvalue}) required_any = ["query_full", "query", "tags"] if not any([field in query_params.keys() for field in required_any]): if not custom_query: message = "Empty query!" message_class = "warning" return {"message": message, "class": message_class} # Check for a source if "source" in query_params: source = query_params["source"] if source in settings.SOURCES_RESTRICTED: if not request.user.has_perm("core.restricted_sources"): message = "Access denied" message_class = "danger" return {"message": message, "class": message_class} elif source not in settings.MAIN_SOURCES: message = "Invalid source" message_class = "danger" return {"message": message, "class": message_class} if source == "all": source = None # the next block will populate it if source: sources = [source] else: sources = list(settings.MAIN_SOURCES) if request.user.has_perm("core.restricted_sources"): for source_iter in settings.SOURCES_RESTRICTED: sources.append(source_iter) add_top_tmp = {"bool": {"should": []}} total_count = 0 for source_iter in sources: add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}}) total_count += 1 total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED) if not total_count == total_sources: add_top.append(add_top_tmp) # Date/time range if set({"from_date", "to_date", "from_time", "to_time"}).issubset( query_params.keys() ): from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") from_ts = int(from_ts.timestamp()) to_ts = int(to_ts.timestamp()) range_query = { "range": { "ts": { "gt": from_ts, "lt": to_ts, } } } add_top.append(range_query) # Sorting if "sorting" in query_params: sorting = query_params["sorting"] if sorting not in ("asc", "desc", "none"): message = "Invalid sort" message_class = "danger" return {"message": message, "class": message_class} if sorting in ("asc", "desc"): sort = [ { "ts": { "order": sorting, } } ] # Sentiment handling if "check_sentiment" in query_params: if "sentiment_method" not in query_params: message = "No sentiment method" message_class = "danger" return {"message": message, "class": message_class} if "sentiment" in query_params: sentiment = query_params["sentiment"] try: sentiment = float(sentiment) except ValueError: message = "Sentiment is not a float" message_class = "danger" return {"message": message, "class": message_class} sentiment_method = query_params["sentiment_method"] range_query_compare = {"range": {"sentiment": {}}} range_query_precise = { "match": { "sentiment": None, } } if sentiment_method == "below": range_query_compare["range"]["sentiment"]["lt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "above": range_query_compare["range"]["sentiment"]["gt"] = sentiment add_top.append(range_query_compare) elif sentiment_method == "exact": range_query_precise["match"]["sentiment"] = sentiment add_top.append(range_query_precise) elif sentiment_method == "nonzero": range_query_precise["match"]["sentiment"] = 0 add_top_negative.append(range_query_precise) if add_bool: # if "bool" not in search_query["query"]: # search_query["query"]["bool"] = {} # if "must" not in search_query["query"]["bool"]: # search_query["query"]["bool"] = {"must": []} for item in add_bool: search_query["query"]["bool"]["must"].append({"match": item}) if add_top: for item in add_top: search_query["query"]["bool"]["must"].append(item) if add_top_negative: for item in add_top_negative: if "must_not" in search_query["query"]["bool"]: search_query["query"]["bool"]["must_not"].append(item) else: search_query["query"]["bool"]["must_not"] = [item] if sort: search_query["sort"] = sort pprint(search_query) results = self.run_query( self.client, request.user, # passed through run_main_query to filter_blacklisted search_query, ) if not results: message = "Error running query" message_class = "danger" return {"message": message, "class": message_class} # results = results.to_dict() if "error" in results: message = results["error"] message_class = "danger" return {"message": message, "class": message_class} results_parsed = parse_results(results) if annotate: annotate_results(results_parsed) if "dedup" in query_params: if query_params["dedup"] == "on": dedup = True else: dedup = False else: dedup = False if reverse: results_parsed = results_parsed[::-1] if dedup: if not dedup_fields: dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"] results_parsed = dedup_list(results_parsed, dedup_fields) context = { "object_list": results_parsed, "card": results["hits"]["total"], "took": results["took"], } if "cache" in results: context["cache"] = results["cache"] return context