import logging import orjson import requests from django.conf import settings from core.db import StorageBackend from core.db.processing import parse_druid from core.views import helpers logger = logging.getLogger(__name__) class DruidBackend(StorageBackend): def __init__(self): super().__init__("Druid") def initialise(self, **kwargs): # self.client = PyDruid("http://broker:8082", "druid/v2") pass # we use requests def construct_context_query( self, index, net, channel, src, num, size, type=None, nicks=None ): search_query = self.construct_query(None, size, index, blank=True) extra_must = [] extra_should = [] extra_should2 = [] if num: extra_must.append({"num": num}) if net: extra_must.append({"net": net}) if channel: extra_must.append({"channel": channel}) if nicks: for nick in nicks: extra_should2.append({"nick": nick}) types = ["msg", "notice", "action", "kick", "topic", "mode"] if index == "internal": if channel == "*status" or type == "znc": if {"channel": channel} in extra_must: extra_must.remove({"channel": channel}) extra_should2 = [] # Type is one of msg or notice # extra_should.append({"match": {"mtype": "msg"}}) # extra_should.append({"match": {"mtype": "notice"}}) extra_should.append({"type": "znc"}) extra_should.append({"type": "self"}) extra_should2.append({"type": "znc"}) extra_should2.append({"nick": channel}) elif type == "auth": if {"match": {"channel": channel}} in extra_must: extra_must.remove({"channel": channel}) extra_should2 = [] extra_should2.append({"nick": channel}) # extra_should2.append({"match": {"mtype": "msg"}}) # extra_should2.append({"match": {"mtype": "notice"}}) extra_should.append({"type": "query"}) extra_should2.append({"type": "self"}) extra_should.append({"nick": channel}) else: for ctype in types: extra_should.append({"mtype": ctype}) else: for ctype in types: extra_should.append({"type": ctype}) if extra_must: self.add_type("and", search_query, extra_must) if extra_should: self.add_type("or", search_query, extra_should) if extra_should2: self.add_type("or", search_query, extra_should2) return search_query def construct_query(self, query, size, blank=False, **kwargs): index = kwargs.get("index") search_query = { "limit": size, "queryType": "scan", "dataSource": index, "intervals": ["1999-01-01/2999-01-01"], } base_filter = { "type": "and", "fields": [], } to_add = { "type": "search", "dimension": "msg", "query": { "type": "insensitive_contains", "value": query, }, } if blank: return search_query else: search_query["filter"] = base_filter search_query["filter"]["fields"].append(to_add) return search_query def parse(self, response): parsed = parse_druid(response) return parsed def run_query(self, user, search_query): ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2) ss = ss.decode() response = requests.post("http://druid:8082/druid/v2", json=search_query) response = orjson.loads(response.text) return response def filter_blacklisted(self, user, response): pass def query_results( self, request, query_params, size=None, annotate=True, custom_query=False, reverse=False, dedup=False, dedup_fields=None, tags=None, ): add_bool = [] add_in = {} helpers.add_defaults(query_params) # Now, run the helpers for SIQTSRSS/ADR # S - Size # I - Index # Q - Query # T - Tags # S - Source # R - Ranges # S - Sort # S - Sentiment # A - Annotate # D - Dedup # R - Reverse # S - Size if request.user.is_anonymous: sizes = settings.MAIN_SIZES_ANON else: sizes = settings.MAIN_SIZES if not size: size = self.parse_size(query_params, sizes) if isinstance(size, dict): return size # I - Index index = self.parse_index(request.user, query_params) if isinstance(index, dict): return index # Q/T - Query/Tags search_query = self.parse_query( query_params, tags, size, custom_query, add_bool, index=index ) # Query should be a dict, so check if it contains message here if "message" in search_query: return search_query # S - Sources sources = self.parse_source(request.user, query_params) if isinstance(sources, dict): return sources total_count = len(sources) total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED) if total_count != total_sources: add_in["src"] = sources # R - Ranges from_ts, to_ts = self.parse_date_time(query_params) if from_ts: addendum = f"{from_ts}/{to_ts}" search_query["intervals"] = [addendum] # S - Sort sort = self.parse_sort(query_params) if isinstance(sort, dict): return sort if sort: search_query["order"] = sort # S - Sentiment sentiment_r = self.parse_sentiment(query_params) if isinstance(sentiment_r, dict): return sentiment_r if sentiment_r: sentiment_method, sentiment = sentiment_r sentiment_query = {"type": "bound", "dimension": "sentiment"} if sentiment_method == "below": sentiment_query["upper"] = sentiment elif sentiment_method == "above": sentiment_query["lower"] = sentiment elif sentiment_method == "exact": sentiment_query["lower"] = sentiment sentiment_query["upper"] = sentiment elif sentiment_method == "nonzero": sentiment_query["lower"] = -0.0001 sentiment_query["upper"] = 0.0001 sentiment_query["lowerStrict"] = True sentiment_query["upperStrict"] = True # add_bool.append(sentiment_query) self.add_filter(search_query) search_query["filter"]["fields"].append(sentiment_query) # Add in the additional information we already populated if add_bool: self.add_type("and", search_query, add_bool) if add_in: self.add_in(search_query, add_in) response = self.query(request.user, search_query) # A/D/R - Annotate/Dedup/Reverse response = self.process_results( response, annotate=annotate, dedup=dedup, dedup_fields=dedup_fields, reverse=reverse, ) context = response return context def add_filter(self, search_query): if "filter" not in search_query: search_query["filter"] = { "type": "and", "fields": [], } def add_in(self, search_query, add_in): self.add_filter(search_query) for key, value in add_in.items(): to_add = {"type": "in", "dimension": key, "values": value} search_query["filter"]["fields"].append(to_add) def add_type(self, gate, search_query, add_bool): top_level_bool = {"type": gate, "fields": []} self.add_filter(search_query) for item in add_bool: for key, value in item.items(): to_add = {"type": "selector", "dimension": key, "value": value} top_level_bool["fields"].append(to_add) search_query["filter"]["fields"].append(top_level_bool) def check_valid_query(self, query_params, custom_query): # We can do blank queries with this data source pass