neptune/core/db/__init__.py

262 lines
9.5 KiB
Python

import random
import string
import time
from abc import ABC, abstractmethod
from math import floor, log10
import orjson
from django.conf import settings
from siphashc import siphash
from core import r
from core.db.processing import annotate_results
from core.util import logs
def remove_defaults(query_params):
for field, value in list(query_params.items()):
if field in settings.DRILLDOWN_DEFAULT_PARAMS:
if value == settings.DRILLDOWN_DEFAULT_PARAMS[field]:
del query_params[field]
def add_defaults(query_params):
for field, value in settings.DRILLDOWN_DEFAULT_PARAMS.items():
if field not in query_params:
query_params[field] = value
def dedup_list(data, check_keys):
"""
Remove duplicate dictionaries from list.
"""
seen = set()
out = []
dup_count = 0
for x in data:
dedupeKey = tuple(x[k] for k in check_keys if k in x)
if dedupeKey in seen:
dup_count += 1
continue
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
dup_count = 0
out.append(x)
seen.add(dedupeKey)
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
return out
class StorageBackend(ABC):
def __init__(self, name):
self.log = logs.get_logger(name)
self.log.info(f"Initialising storage backend {name}")
self.initialise_caching()
# self.initialise()
@abstractmethod
def initialise(self, **kwargs):
pass
def initialise_caching(self):
hash_key = r.get("cache_hash_key")
if not hash_key:
letters = string.ascii_lowercase
hash_key = "".join(random.choice(letters) for i in range(16))
self.log.debug(f"Created new hash key: {hash_key}")
r.set("cache_hash_key", hash_key)
else:
hash_key = hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {hash_key}")
self.hash_key = hash_key
@abstractmethod
def construct_query(self, **kwargs):
pass
def parse_query(self, query_params, tags, size, custom_query, add_bool, **kwargs):
query_created = False
if "query" in query_params:
query = query_params["query"]
search_query = self.construct_query(query, size, **kwargs)
query_created = True
else:
if custom_query:
search_query = custom_query
else:
search_query = self.construct_query(None, size, blank=True, **kwargs)
if tags:
# Get a blank search query
if not query_created:
search_query = self.construct_query(None, size, blank=True, **kwargs)
query_created = True
for item in tags:
for tagname, tagvalue in item.items():
add_bool.append({tagname: tagvalue})
bypass_check = kwargs.get("bypass_check", False)
if not bypass_check:
valid = self.check_valid_query(query_params, custom_query, **kwargs)
if isinstance(valid, dict):
return valid
return search_query
def check_valid_query(self, query_params, custom_query):
required_any = ["query", "tags"]
if not any([field in query_params.keys() for field in required_any]):
if not custom_query:
message = "Empty query!"
message_class = "warning"
return {"message": message, "class": message_class}
@abstractmethod
def run_query(self, **kwargs):
pass
def filter_blacklisted(self, user, response):
"""
Low level filter to take the raw search response and remove
objects from it we want to keep secret.
Does not return, the object is mutated in place.
"""
response["redacted"] = 0
response["exemption"] = None
if user.is_superuser:
response["exemption"] = True
# is_anonymous = isinstance(user, AnonymousUser)
# For every hit from ES
for index, item in enumerate(list(response["hits"]["hits"])):
# For every blacklisted type
for blacklisted_type in settings.ELASTICSEARCH_BLACKLISTED.keys():
# Check this field we are matching exists
if "_source" in item.keys():
data_index = "_source"
elif "fields" in item.keys():
data_index = "fields"
else:
return False
if blacklisted_type in item[data_index].keys():
content = item[data_index][blacklisted_type]
# For every item in the blacklisted array for the type
for blacklisted_item in settings.BLACKLISTED[blacklisted_type]:
if blacklisted_item == str(content):
# Remove the item
if item in response["hits"]["hits"]:
# Let the UI know something was redacted
if (
"exemption"
not in response["hits"]["hits"][index][data_index]
):
response["redacted"] += 1
# Anonymous
if user.is_anonymous:
# Just set it to none so the index is not off
response["hits"]["hits"][index] = None
else:
if not user.has_perm("core.bypass_blacklist"):
response["hits"]["hits"][index] = None
else:
response["hits"]["hits"][index][data_index][
"exemption"
] = True
# Actually get rid of all the things we set to None
response["hits"]["hits"] = [hit for hit in response["hits"]["hits"] if hit]
def query(self, user, search_query, **kwargs):
# For time tracking
start = time.process_time()
if settings.CACHE:
# Sort the keys so the hash is the same
query_normalised = orjson.dumps(search_query, option=orjson.OPT_SORT_KEYS)
hash = siphash(self.hash_key, query_normalised)
cache_hit = r.get(f"query_cache.{user.id}.{hash}")
if cache_hit:
response = orjson.loads(cache_hit)
time_took = (time.process_time() - start) * 1000
# Round to 3 significant figures
time_took_rounded = round(
time_took, 3 - int(floor(log10(abs(time_took)))) - 1
)
return {
"object_list": response,
"took": time_took_rounded,
"cache": True,
}
response = self.run_query(user, search_query, **kwargs)
# For Elasticsearch
if isinstance(response, Exception):
message = f"Error: {response.info['error']['root_cause'][0]['type']}"
message_class = "danger"
return {"message": message, "class": message_class}
if "took" in response:
if response["took"] is None:
return None
if len(response["hits"]["hits"]) == 0:
message = "No results."
message_class = "danger"
time_took = (time.process_time() - start) * 1000
# Round to 3 significant figures
time_took_rounded = round(
time_took, 3 - int(floor(log10(abs(time_took)))) - 1
)
return {
"message": message,
"class": message_class,
"took": time_took_rounded,
}
# For Druid
if "error" in response:
if "errorMessage" in response:
context = {
"message": response["errorMessage"],
"class": "danger",
}
return context
else:
return response
# Removed for now, no point given we have restricted indexes
# self.filter_blacklisted(user, response)
# Parse the response
response_parsed = self.parse(response)
# Write cache
if settings.CACHE:
to_write_cache = orjson.dumps(response_parsed)
r.set(f"query_cache.{user.id}.{hash}", to_write_cache)
r.expire(f"query_cache.{user.id}.{hash}", settings.CACHE_TIMEOUT)
time_took = (time.process_time() - start) * 1000
# Round to 3 significant figures
time_took_rounded = round(time_took, 3 - int(floor(log10(abs(time_took)))) - 1)
return {"object_list": response_parsed, "took": time_took_rounded}
@abstractmethod
def query_results(self, **kwargs):
pass
def process_results(self, response, **kwargs):
if kwargs.get("annotate"):
annotate_results(response)
if kwargs.get("reverse"):
response.reverse()
if kwargs.get("dedup"):
dedup_fields = kwargs.get("dedup_fields")
if not dedup_fields:
dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"]
response = dedup_list(response, dedup_fields)
return response
@abstractmethod
def parse(self, response):
pass