neptune/core/db/druid.py

271 lines
8.7 KiB
Python
Raw Normal View History

2022-09-27 14:15:08 +00:00
import logging
import orjson
import requests
from django.conf import settings
from core.db import StorageBackend
from core.db.processing import parse_druid
from core.views import helpers
logger = logging.getLogger(__name__)
class DruidBackend(StorageBackend):
def __init__(self):
super().__init__("Druid")
def initialise(self, **kwargs):
# self.client = PyDruid("http://broker:8082", "druid/v2")
pass # we use requests
2022-09-30 06:22:22 +00:00
def construct_context_query(
self, index, net, channel, src, num, size, type=None, nicks=None
):
search_query = self.construct_query(None, size, index, blank=True)
extra_must = []
extra_should = []
extra_should2 = []
if num:
extra_must.append({"num": num})
if net:
extra_must.append({"net": net})
if channel:
extra_must.append({"channel": channel})
if nicks:
for nick in nicks:
extra_should2.append({"nick": nick})
types = ["msg", "notice", "action", "kick", "topic", "mode"]
if index == "internal":
if channel == "*status" or type == "znc":
if {"channel": channel} in extra_must:
extra_must.remove({"channel": channel})
extra_should2 = []
# Type is one of msg or notice
# extra_should.append({"match": {"mtype": "msg"}})
# extra_should.append({"match": {"mtype": "notice"}})
extra_should.append({"type": "znc"})
extra_should.append({"type": "self"})
extra_should2.append({"type": "znc"})
extra_should2.append({"nick": channel})
elif type == "auth":
if {"match": {"channel": channel}} in extra_must:
extra_must.remove({"channel": channel})
extra_should2 = []
extra_should2.append({"nick": channel})
# extra_should2.append({"match": {"mtype": "msg"}})
# extra_should2.append({"match": {"mtype": "notice"}})
extra_should.append({"type": "query"})
extra_should2.append({"type": "self"})
extra_should.append({"nick": channel})
else:
for ctype in types:
extra_should.append({"mtype": ctype})
else:
for ctype in types:
extra_should.append({"type": ctype})
if extra_must:
self.add_type("and", search_query, extra_must)
if extra_should:
self.add_type("or", search_query, extra_should)
if extra_should2:
self.add_type("or", search_query, extra_should2)
return search_query
2022-11-23 18:15:42 +00:00
def construct_query(self, query, size, blank=False, **kwargs):
index = kwargs.get("index")
2022-09-27 14:15:08 +00:00
search_query = {
"limit": size,
"queryType": "scan",
"dataSource": index,
2022-09-30 06:22:22 +00:00
"intervals": ["1999-01-01/2999-01-01"],
}
2022-09-27 14:15:08 +00:00
2022-09-30 06:22:22 +00:00
base_filter = {
"type": "and",
"fields": [],
}
to_add = {
"type": "search",
"dimension": "msg",
"query": {
"type": "insensitive_contains",
"value": query,
2022-09-27 14:15:08 +00:00
},
}
if blank:
return search_query
else:
2022-09-30 06:22:22 +00:00
search_query["filter"] = base_filter
2022-09-27 14:15:08 +00:00
search_query["filter"]["fields"].append(to_add)
return search_query
def parse(self, response):
parsed = parse_druid(response)
return parsed
def run_query(self, user, search_query):
2022-09-30 06:22:22 +00:00
ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2)
ss = ss.decode()
2022-10-04 20:47:37 +00:00
response = requests.post("http://druid:8082/druid/v2", json=search_query)
2022-09-27 14:15:08 +00:00
response = orjson.loads(response.text)
return response
def filter_blacklisted(self, user, response):
pass
def query_results(
self,
request,
query_params,
size=None,
annotate=True,
custom_query=False,
reverse=False,
dedup=False,
dedup_fields=None,
tags=None,
):
add_bool = []
2022-09-30 06:22:22 +00:00
add_in = {}
2022-09-27 14:15:08 +00:00
helpers.add_defaults(query_params)
2022-09-30 06:22:22 +00:00
# Now, run the helpers for SIQTSRSS/ADR
# S - Size
# I - Index
# Q - Query
# T - Tags
# S - Source
# R - Ranges
# S - Sort
# S - Sentiment
# A - Annotate
# D - Dedup
# R - Reverse
# S - Size
2022-09-27 14:15:08 +00:00
if request.user.is_anonymous:
sizes = settings.MAIN_SIZES_ANON
else:
sizes = settings.MAIN_SIZES
if not size:
size = self.parse_size(query_params, sizes)
if isinstance(size, dict):
return size
2022-09-30 06:22:22 +00:00
# I - Index
2022-09-27 14:15:08 +00:00
index = self.parse_index(request.user, query_params)
if isinstance(index, dict):
return index
2022-09-30 06:22:22 +00:00
# Q/T - Query/Tags
search_query = self.parse_query(
2022-11-23 18:15:42 +00:00
query_params, tags, size, custom_query, add_bool, index=index
2022-09-30 06:22:22 +00:00
)
# Query should be a dict, so check if it contains message here
if "message" in search_query:
2022-09-27 14:15:08 +00:00
return search_query
2022-09-30 06:22:22 +00:00
# S - Sources
2022-09-27 14:15:08 +00:00
sources = self.parse_source(request.user, query_params)
2022-09-30 06:22:22 +00:00
if isinstance(sources, dict):
return sources
total_count = len(sources)
total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED)
if total_count != total_sources:
add_in["src"] = sources
# R - Ranges
from_ts, to_ts = self.parse_date_time(query_params)
if from_ts:
addendum = f"{from_ts}/{to_ts}"
search_query["intervals"] = [addendum]
# S - Sort
sort = self.parse_sort(query_params)
if isinstance(sort, dict):
return sort
if sort:
search_query["order"] = sort
# S - Sentiment
sentiment_r = self.parse_sentiment(query_params)
if isinstance(sentiment_r, dict):
return sentiment_r
if sentiment_r:
sentiment_method, sentiment = sentiment_r
sentiment_query = {"type": "bound", "dimension": "sentiment"}
if sentiment_method == "below":
sentiment_query["upper"] = sentiment
elif sentiment_method == "above":
sentiment_query["lower"] = sentiment
elif sentiment_method == "exact":
sentiment_query["lower"] = sentiment
sentiment_query["upper"] = sentiment
elif sentiment_method == "nonzero":
sentiment_query["lower"] = -0.0001
sentiment_query["upper"] = 0.0001
sentiment_query["lowerStrict"] = True
sentiment_query["upperStrict"] = True
# add_bool.append(sentiment_query)
self.add_filter(search_query)
search_query["filter"]["fields"].append(sentiment_query)
# Add in the additional information we already populated
2022-09-27 14:15:08 +00:00
if add_bool:
2022-09-30 06:22:22 +00:00
self.add_type("and", search_query, add_bool)
if add_in:
self.add_in(search_query, add_in)
2022-09-27 14:15:08 +00:00
response = self.query(request.user, search_query)
2022-09-30 06:22:22 +00:00
# A/D/R - Annotate/Dedup/Reverse
self.process_results(
response,
annotate=annotate,
dedup=dedup,
dedup_fields=dedup_fields,
reverse=reverse,
)
2022-09-27 14:15:08 +00:00
# ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2)
# ss = ss.decode()
# print(ss)
# print("PARSED", results_parsed)
# return results_parsed
context = response
return context
2022-09-30 06:22:22 +00:00
def add_filter(self, search_query):
if "filter" not in search_query:
search_query["filter"] = {
"type": "and",
"fields": [],
}
def add_in(self, search_query, add_in):
self.add_filter(search_query)
for key, value in add_in.items():
to_add = {"type": "in", "dimension": key, "values": value}
search_query["filter"]["fields"].append(to_add)
def add_type(self, gate, search_query, add_bool):
top_level_bool = {"type": gate, "fields": []}
self.add_filter(search_query)
for item in add_bool:
for key, value in item.items():
to_add = {"type": "selector", "dimension": key, "value": value}
top_level_bool["fields"].append(to_add)
search_query["filter"]["fields"].append(top_level_bool)
def check_valid_query(self, query_params, custom_query):
# We can do blank queries with this data source
pass