273 lines
8.6 KiB
Python
273 lines
8.6 KiB
Python
import logging
|
|
|
|
import orjson
|
|
import requests
|
|
from django.conf import settings
|
|
|
|
from core.db import StorageBackend, add_defaults
|
|
from core.db.processing import parse_druid
|
|
from core.lib.parsing import (
|
|
parse_date_time,
|
|
parse_index,
|
|
parse_sentiment,
|
|
parse_size,
|
|
parse_sort,
|
|
parse_source,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DruidBackend(StorageBackend):
|
|
def __init__(self):
|
|
super().__init__("druid")
|
|
|
|
def initialise(self, **kwargs):
|
|
# self.client = PyDruid("http://broker:8082", "druid/v2")
|
|
pass # we use requests
|
|
|
|
def construct_context_query(
|
|
self, index, net, channel, src, num, size, type=None, nicks=None
|
|
):
|
|
search_query = self.construct_query(None, size, index, blank=True)
|
|
extra_must = []
|
|
extra_should = []
|
|
extra_should2 = []
|
|
if num:
|
|
extra_must.append({"num": num})
|
|
if net:
|
|
extra_must.append({"net": net})
|
|
if channel:
|
|
extra_must.append({"channel": channel})
|
|
if nicks:
|
|
for nick in nicks:
|
|
extra_should2.append({"nick": nick})
|
|
types = ["msg", "notice", "action", "kick", "topic", "mode"]
|
|
|
|
if index == "internal":
|
|
if channel == "*status" or type == "znc":
|
|
if {"channel": channel} in extra_must:
|
|
extra_must.remove({"channel": channel})
|
|
extra_should2 = []
|
|
# Type is one of msg or notice
|
|
# extra_should.append({"match": {"mtype": "msg"}})
|
|
# extra_should.append({"match": {"mtype": "notice"}})
|
|
extra_should.append({"type": "znc"})
|
|
extra_should.append({"type": "self"})
|
|
|
|
extra_should2.append({"type": "znc"})
|
|
extra_should2.append({"nick": channel})
|
|
elif type == "auth":
|
|
if {"match": {"channel": channel}} in extra_must:
|
|
extra_must.remove({"channel": channel})
|
|
extra_should2 = []
|
|
extra_should2.append({"nick": channel})
|
|
# extra_should2.append({"match": {"mtype": "msg"}})
|
|
# extra_should2.append({"match": {"mtype": "notice"}})
|
|
|
|
extra_should.append({"type": "query"})
|
|
extra_should2.append({"type": "self"})
|
|
extra_should.append({"nick": channel})
|
|
else:
|
|
for ctype in types:
|
|
extra_should.append({"mtype": ctype})
|
|
else:
|
|
for ctype in types:
|
|
extra_should.append({"type": ctype})
|
|
|
|
if extra_must:
|
|
self.add_type("and", search_query, extra_must)
|
|
|
|
if extra_should:
|
|
self.add_type("or", search_query, extra_should)
|
|
if extra_should2:
|
|
self.add_type("or", search_query, extra_should2)
|
|
return search_query
|
|
|
|
def construct_query(self, query, size, blank=False, **kwargs):
|
|
index = kwargs.get("index")
|
|
search_query = {
|
|
"limit": size,
|
|
"queryType": "scan",
|
|
"dataSource": index,
|
|
"intervals": ["1999-01-01/2999-01-01"],
|
|
}
|
|
|
|
base_filter = {
|
|
"type": "and",
|
|
"fields": [],
|
|
}
|
|
to_add = {
|
|
"type": "search",
|
|
"dimension": "msg",
|
|
"query": {
|
|
"type": "insensitive_contains",
|
|
"value": query,
|
|
},
|
|
}
|
|
|
|
if blank:
|
|
return search_query
|
|
else:
|
|
search_query["filter"] = base_filter
|
|
search_query["filter"]["fields"].append(to_add)
|
|
return search_query
|
|
|
|
def parse(self, response):
|
|
parsed = parse_druid(response)
|
|
return parsed
|
|
|
|
def run_query(self, user, search_query):
|
|
ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2)
|
|
ss = ss.decode()
|
|
response = requests.post("http://druid:8082/druid/v2", json=search_query)
|
|
response = orjson.loads(response.text)
|
|
return response
|
|
|
|
def filter_blacklisted(self, user, response):
|
|
pass
|
|
|
|
def query_results(
|
|
self,
|
|
request,
|
|
query_params,
|
|
size=None,
|
|
annotate=True,
|
|
custom_query=False,
|
|
reverse=False,
|
|
dedup=False,
|
|
dedup_fields=None,
|
|
tags=None,
|
|
):
|
|
add_bool = []
|
|
add_in = {}
|
|
|
|
add_defaults(query_params)
|
|
|
|
# Now, run the helpers for SIQTSRSS/ADR
|
|
# S - Size
|
|
# I - Index
|
|
# Q - Query
|
|
# T - Tags
|
|
# S - Source
|
|
# R - Ranges
|
|
# S - Sort
|
|
# S - Sentiment
|
|
# A - Annotate
|
|
# D - Dedup
|
|
# R - Reverse
|
|
|
|
# S - Size
|
|
if request.user.is_anonymous:
|
|
sizes = settings.MAIN_SIZES_ANON
|
|
else:
|
|
sizes = settings.MAIN_SIZES
|
|
if not size:
|
|
size = parse_size(query_params, sizes)
|
|
if isinstance(size, dict):
|
|
return size
|
|
|
|
# I - Index
|
|
index = parse_index(request.user, query_params)
|
|
if isinstance(index, dict):
|
|
return index
|
|
|
|
# Q/T - Query/Tags
|
|
search_query = self.parse_query(
|
|
query_params, tags, size, custom_query, add_bool, index=index
|
|
)
|
|
# Query should be a dict, so check if it contains message here
|
|
if "message" in search_query:
|
|
return search_query
|
|
|
|
# S - Sources
|
|
sources = parse_source(request.user, query_params)
|
|
if isinstance(sources, dict):
|
|
return sources
|
|
total_count = len(sources)
|
|
total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED)
|
|
if total_count != total_sources:
|
|
add_in["src"] = sources
|
|
|
|
# R - Ranges
|
|
from_ts, to_ts = parse_date_time(query_params)
|
|
if from_ts:
|
|
addendum = f"{from_ts}/{to_ts}"
|
|
search_query["intervals"] = [addendum]
|
|
|
|
# S - Sort
|
|
sort = parse_sort(query_params)
|
|
if isinstance(sort, dict):
|
|
return sort
|
|
if sort:
|
|
search_query["order"] = sort
|
|
|
|
# S - Sentiment
|
|
sentiment_r = parse_sentiment(query_params)
|
|
if isinstance(sentiment_r, dict):
|
|
return sentiment_r
|
|
if sentiment_r:
|
|
sentiment_method, sentiment = sentiment_r
|
|
sentiment_query = {"type": "bound", "dimension": "sentiment"}
|
|
if sentiment_method == "below":
|
|
sentiment_query["upper"] = sentiment
|
|
elif sentiment_method == "above":
|
|
sentiment_query["lower"] = sentiment
|
|
elif sentiment_method == "exact":
|
|
sentiment_query["lower"] = sentiment
|
|
sentiment_query["upper"] = sentiment
|
|
elif sentiment_method == "nonzero":
|
|
sentiment_query["lower"] = -0.0001
|
|
sentiment_query["upper"] = 0.0001
|
|
sentiment_query["lowerStrict"] = True
|
|
sentiment_query["upperStrict"] = True
|
|
# add_bool.append(sentiment_query)
|
|
self.add_filter(search_query)
|
|
search_query["filter"]["fields"].append(sentiment_query)
|
|
|
|
# Add in the additional information we already populated
|
|
if add_bool:
|
|
self.add_type("and", search_query, add_bool)
|
|
if add_in:
|
|
self.add_in(search_query, add_in)
|
|
|
|
response = self.query(request.user, search_query)
|
|
|
|
# A/D/R - Annotate/Dedup/Reverse
|
|
response = self.process_results(
|
|
response,
|
|
annotate=annotate,
|
|
dedup=dedup,
|
|
dedup_fields=dedup_fields,
|
|
reverse=reverse,
|
|
)
|
|
context = response
|
|
return context
|
|
|
|
def add_filter(self, search_query):
|
|
if "filter" not in search_query:
|
|
search_query["filter"] = {
|
|
"type": "and",
|
|
"fields": [],
|
|
}
|
|
|
|
def add_in(self, search_query, add_in):
|
|
self.add_filter(search_query)
|
|
for key, value in add_in.items():
|
|
to_add = {"type": "in", "dimension": key, "values": value}
|
|
search_query["filter"]["fields"].append(to_add)
|
|
|
|
def add_type(self, gate, search_query, add_bool):
|
|
top_level_bool = {"type": gate, "fields": []}
|
|
self.add_filter(search_query)
|
|
for item in add_bool:
|
|
for key, value in item.items():
|
|
to_add = {"type": "selector", "dimension": key, "value": value}
|
|
top_level_bool["fields"].append(to_add)
|
|
|
|
search_query["filter"]["fields"].append(top_level_bool)
|
|
|
|
def check_valid_query(self, query_params, custom_query):
|
|
# We can do blank queries with this data source
|
|
pass
|