Implement Druid DB fetching

This commit is contained in:
Mark Veidemanis 2022-09-30 07:22:22 +01:00
parent 202a13cccb
commit bb00475029
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
5 changed files with 234 additions and 90 deletions

View File

@ -1,6 +1,7 @@
import random
import string
import time
from datetime import datetime
from math import floor, log10
import orjson
@ -10,6 +11,7 @@ from siphashc import siphash
from core import r
from core.db.processing import annotate_results
from core.util import logs
from core.views import helpers
class StorageBackend(object):
@ -71,6 +73,15 @@ class StorageBackend(object):
index = settings.INDEX_META
elif index == "internal":
index = settings.INDEX_INT
elif index == "restricted":
if not user.has_perm("core.restricted_sources"):
message = "Not permitted to search by this index"
message_class = "danger"
return {
"message": message,
"class": message_class,
}
index = settings.INDEX_RESTRICTED
else:
message = "Index is not valid."
message_class = "danger"
@ -83,6 +94,7 @@ class StorageBackend(object):
return index
def parse_query(self, query_params, tags, size, index, custom_query, add_bool):
query_created = False
if "query" in query_params:
query = query_params["query"]
search_query = self.construct_query(query, size, index)
@ -90,6 +102,8 @@ class StorageBackend(object):
else:
if custom_query:
search_query = custom_query
else:
search_query = self.construct_query(None, size, index, blank=True)
if tags:
# Get a blank search query
@ -99,6 +113,13 @@ class StorageBackend(object):
for tagname, tagvalue in tags.items():
add_bool.append({tagname: tagvalue})
valid = self.check_valid_query(query_params, custom_query)
if isinstance(valid, dict):
return valid
return search_query
def check_valid_query(self, query_params, custom_query):
required_any = ["query", "tags"]
if not any([field in query_params.keys() for field in required_any]):
if not custom_query:
@ -106,8 +127,6 @@ class StorageBackend(object):
message_class = "warning"
return {"message": message, "class": message_class}
return search_query
def parse_source(self, user, query_params):
if "source" in query_params:
source = query_params["source"]
@ -133,11 +152,59 @@ class StorageBackend(object):
for source_iter in settings.SOURCES_RESTRICTED:
sources.append(source_iter)
if "all" in sources:
sources.remove("all")
return sources
def parse_sort(self, query_params):
sort = None
if "sorting" in query_params:
sorting = query_params["sorting"]
if sorting not in ("asc", "desc", "none"):
message = "Invalid sort"
message_class = "danger"
return {"message": message, "class": message_class}
if sorting == "asc":
sort = "ascending"
elif sorting == "desc":
sort = "descending"
return sort
def parse_date_time(self, query_params):
if set({"from_date", "to_date", "from_time", "to_time"}).issubset(
query_params.keys()
):
from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z"
to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z"
from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ")
to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ")
return (from_ts, to_ts)
return (None, None)
def parse_sentiment(self, query_params):
sentiment = None
if "check_sentiment" in query_params:
if "sentiment_method" not in query_params:
message = "No sentiment method"
message_class = "danger"
return {"message": message, "class": message_class}
if "sentiment" in query_params:
sentiment = query_params["sentiment"]
try:
sentiment = float(sentiment)
except ValueError:
message = "Sentiment is not a float"
message_class = "danger"
return {"message": message, "class": message_class}
sentiment_method = query_params["sentiment_method"]
return (sentiment_method, sentiment)
def filter_blacklisted(self, user, response):
"""
Low level filter to take the raw OpenSearch response and remove
Low level filter to take the raw search response and remove
objects from it we want to keep secret.
Does not return, the object is mutated in place.
"""
@ -197,11 +264,28 @@ class StorageBackend(object):
cache_hit = r.get(f"query_cache.{user.id}.{hash}")
if cache_hit:
response = orjson.loads(cache_hit)
response["cache"] = True
return response
print("CACHE HIT", response)
time_took = (time.process_time() - start) * 1000
# Round to 3 significant figures
time_took_rounded = round(
time_took, 3 - int(floor(log10(abs(time_took)))) - 1
)
return {
"object_list": response,
"took": time_took_rounded,
"cache": True,
}
response = self.run_query(user, search_query)
if "error" in response and len(response.keys()) == 1:
return response
if "error" in response:
if "errorMessage" in response:
context = {
"message": response["errorMessage"],
"class": "danger",
}
return context
else:
return response
# response = response.to_dict()
# print("RESP", response)
if "took" in response:
@ -209,15 +293,15 @@ class StorageBackend(object):
return None
self.filter_blacklisted(user, response)
# Write cache
if settings.CACHE:
to_write_cache = orjson.dumps(response)
r.set(f"query_cache.{user.id}.{hash}", to_write_cache)
r.expire(f"query_cache.{user.id}.{hash}", settings.CACHE_TIMEOUT)
# Parse the response
response_parsed = self.parse(response)
# Write cache
if settings.CACHE:
to_write_cache = orjson.dumps(response_parsed)
r.set(f"query_cache.{user.id}.{hash}", to_write_cache)
r.expire(f"query_cache.{user.id}.{hash}", settings.CACHE_TIMEOUT)
time_took = (time.process_time() - start) * 1000
# Round to 3 significant figures
time_took_rounded = round(time_took, 3 - int(floor(log10(abs(time_took)))) - 1)
@ -226,9 +310,15 @@ class StorageBackend(object):
def query_results(self, **kwargs):
raise NotImplementedError
def process_results(self, **kwargs):
def process_results(self, response, **kwargs):
if kwargs.get("annotate"):
annotate_results(kwargs["results"])
annotate_results(response)
if kwargs.get("dedup"):
response = response[::-1]
if kwargs.get("dedup"):
if not kwargs.get("dedup_fields"):
dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"]
response = helpers.dedup_list(response, dedup_fields)
def parse(self, response):
raise NotImplementedError

View File

@ -1,17 +1,9 @@
import logging
import random
import string
import time
from datetime import datetime
from math import floor, log10
from pprint import pprint
import orjson
import requests
from django.conf import settings
from siphashc import siphash
from core import r
from core.db import StorageBackend
from core.db.processing import parse_druid
from core.views import helpers
@ -32,30 +24,26 @@ class DruidBackend(StorageBackend):
"limit": size,
"queryType": "scan",
"dataSource": index,
"filter": {
"type": "and",
"fields": [
],
},
# "resultFormat": "list",
# "columns":[],
"intervals": ["1000-01-01/3000-01-01"],
# "batchSize": 20480,
"intervals": ["1999-01-01/2999-01-01"],
}
to_add = {
"type": "search",
"dimension": "msg",
"query": {
"type": "insensitive_contains",
"value": query,
},
},
base_filter = {
"type": "and",
"fields": [],
}
to_add = {
"type": "search",
"dimension": "msg",
"query": {
"type": "insensitive_contains",
"value": query,
},
}
if blank:
return search_query
else:
search_query["filter"] = base_filter
search_query["filter"]["fields"].append(to_add)
return search_query
@ -65,12 +53,15 @@ class DruidBackend(StorageBackend):
return parsed
def run_query(self, user, search_query):
ss = orjson.dumps(search_query, option=orjson.OPT_INDENT_2)
ss = ss.decode()
print(ss)
response = requests.post("http://broker:8082/druid/v2", json=search_query)
response = orjson.loads(response.text)
print("RESPONSE LEN", len(response))
ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2)
ss = ss.decode()
print(ss)
# ss = orjson.dumps(response, option=orjson.OPT_INDENT_2)
# ss = ss.decode()
# print(ss)
return response
def filter_blacklisted(self, user, response):
@ -89,12 +80,24 @@ class DruidBackend(StorageBackend):
tags=None,
):
add_bool = []
add_top = []
add_in = {}
helpers.add_defaults(query_params)
# Check size
# Now, run the helpers for SIQTSRSS/ADR
# S - Size
# I - Index
# Q - Query
# T - Tags
# S - Source
# R - Ranges
# S - Sort
# S - Sentiment
# A - Annotate
# D - Dedup
# R - Reverse
# S - Size
if request.user.is_anonymous:
sizes = settings.MAIN_SIZES_ANON
else:
@ -104,37 +107,80 @@ class DruidBackend(StorageBackend):
if isinstance(size, dict):
return size
# Check index
# I - Index
index = self.parse_index(request.user, query_params)
if isinstance(index, dict):
return index
# Create the search query
search_query = self.parse_query(query_params, tags, size, index, custom_query, add_bool)
if isinstance(search_query, dict):
# Q/T - Query/Tags
search_query = self.parse_query(
query_params, tags, size, index, custom_query, add_bool
)
# Query should be a dict, so check if it contains message here
if "message" in search_query:
return search_query
# S - Sources
sources = self.parse_source(request.user, query_params)
# TODO
add_top_tmp = {"bool": {"should": []}}
total_count = 0
for source_iter in sources:
add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}})
total_count += 1
total_sources = len(settings.MAIN_SOURCES) + len(
settings.SOURCES_RESTRICTED
)
if not total_count == total_sources:
add_top.append(add_top_tmp)
if isinstance(sources, dict):
return sources
total_count = len(sources)
total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED)
if total_count != total_sources:
add_in["src"] = sources
print("SIZE IS", size)
# R - Ranges
from_ts, to_ts = self.parse_date_time(query_params)
if from_ts:
addendum = f"{from_ts}/{to_ts}"
search_query["intervals"] = [addendum]
# S - Sort
sort = self.parse_sort(query_params)
if isinstance(sort, dict):
return sort
if sort:
search_query["order"] = sort
# S - Sentiment
sentiment_r = self.parse_sentiment(query_params)
if isinstance(sentiment_r, dict):
return sentiment_r
if sentiment_r:
sentiment_method, sentiment = sentiment_r
sentiment_query = {"type": "bound", "dimension": "sentiment"}
if sentiment_method == "below":
sentiment_query["upper"] = sentiment
elif sentiment_method == "above":
sentiment_query["lower"] = sentiment
elif sentiment_method == "exact":
sentiment_query["lower"] = sentiment
sentiment_query["upper"] = sentiment
elif sentiment_method == "nonzero":
sentiment_query["lower"] = -0.0001
sentiment_query["upper"] = 0.0001
sentiment_query["lowerStrict"] = True
sentiment_query["upperStrict"] = True
# add_bool.append(sentiment_query)
self.add_filter(search_query)
search_query["filter"]["fields"].append(sentiment_query)
# Add in the additional information we already populated
if add_bool:
self.add_bool(search_query, add_bool)
self.add_type("and", search_query, add_bool)
if add_in:
self.add_in(search_query, add_in)
response = self.query(request.user, search_query)
# print("RESP", response)
# A/D/R - Annotate/Dedup/Reverse
self.process_results(
response,
annotate=annotate,
dedup=dedup,
dedup_fields=dedup_fields,
reverse=reverse,
)
# ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2)
# ss = ss.decode()
# print(ss)
@ -143,11 +189,29 @@ class DruidBackend(StorageBackend):
context = response
return context
def add_bool(self, search_query, add_bool):
if "filter" in search_query:
if "fields" in search_query["filter"]:
search_query["filter"]["fields"].append({"bool": {"should": add_bool}})
else:
search_query["filter"]["fields"] = [{"bool": {"should": add_bool}}]
else:
search_query["filter"] = {"bool": {"should": add_bool}}
def add_filter(self, search_query):
if "filter" not in search_query:
search_query["filter"] = {
"type": "and",
"fields": [],
}
def add_in(self, search_query, add_in):
self.add_filter(search_query)
for key, value in add_in.items():
to_add = {"type": "in", "dimension": key, "values": value}
search_query["filter"]["fields"].append(to_add)
def add_type(self, gate, search_query, add_bool):
top_level_bool = {"type": gate, "fields": []}
self.add_filter(search_query)
for item in add_bool:
for key, value in item.items():
to_add = {"type": "selector", "dimension": key, "value": value}
top_level_bool["fields"].append(to_add)
search_query["filter"]["fields"].append(top_level_bool)
def check_valid_query(self, query_params, custom_query):
# We can do blank queries with this data source
pass

View File

@ -1,18 +1,12 @@
import logging
import random
import string
import time
from datetime import datetime
from math import floor, log10
from pprint import pprint
import orjson
import requests
from django.conf import settings
from core import r
from core.db import StorageBackend
from core.db.processing import annotate_results, filter_blacklisted, parse_results
from core.db.processing import annotate_results, parse_results
from core.views import helpers
logger = logging.getLogger(__name__)
@ -120,7 +114,7 @@ class ManticoreBackend(StorageBackend):
# Create the search query
if "query" in query_params:
query = query_params["query"]
search_query = construct_query(query, size, index)
search_query = self.construct_query(query, size, index)
query_created = True
else:
if custom_query:
@ -129,7 +123,7 @@ class ManticoreBackend(StorageBackend):
if tags:
# Get a blank search query
if not query_created:
search_query = construct_query(None, size, index, blank=True)
search_query = self.construct_query(None, size, index, blank=True)
query_created = True
for tagname, tagvalue in tags.items():
add_bool.append({tagname: tagvalue})
@ -171,9 +165,7 @@ class ManticoreBackend(StorageBackend):
for source_iter in sources:
add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}})
total_count += 1
total_sources = len(settings.MAIN_SOURCES) + len(
settings.SOURCES_RESTRICTED
)
total_sources = len(settings.MAIN_SOURCES) + len(settings.SOURCES_RESTRICTED)
if not total_count == total_sources:
add_top.append(add_top_tmp)
@ -269,8 +261,8 @@ class ManticoreBackend(StorageBackend):
search_query["sort"] = sort
pprint(search_query)
results = run_query(
client,
results = self.run_query(
self.client,
request.user, # passed through run_main_query to filter_blacklisted
search_query,
)

View File

@ -9,7 +9,7 @@ from core.db import StorageBackend
# from json import dumps
# pp = lambda x: print(dumps(x, indent=2))
from core.db.processing import annotate_results, filter_blacklisted, parse_results
from core.db.processing import annotate_results, parse_results
from core.views.helpers import dedup_list

View File

@ -1,7 +1,5 @@
from datetime import datetime
from django.conf import settings
from core.lib.threshold import annotate_num_chans, annotate_num_users, annotate_online