neptune/core/db/druid.py

154 lines
4.3 KiB
Python

import logging
import random
import string
import time
from datetime import datetime
from math import floor, log10
from pprint import pprint
import orjson
import requests
from django.conf import settings
from siphashc import siphash
from core import r
from core.db import StorageBackend
from core.db.processing import parse_druid
from core.views import helpers
logger = logging.getLogger(__name__)
class DruidBackend(StorageBackend):
def __init__(self):
super().__init__("Druid")
def initialise(self, **kwargs):
# self.client = PyDruid("http://broker:8082", "druid/v2")
pass # we use requests
def construct_query(self, query, size, index, blank=False):
search_query = {
"limit": size,
"queryType": "scan",
"dataSource": index,
"filter": {
"type": "and",
"fields": [
],
},
# "resultFormat": "list",
# "columns":[],
"intervals": ["1000-01-01/3000-01-01"],
# "batchSize": 20480,
}
to_add = {
"type": "search",
"dimension": "msg",
"query": {
"type": "insensitive_contains",
"value": query,
},
},
if blank:
return search_query
else:
search_query["filter"]["fields"].append(to_add)
return search_query
def parse(self, response):
parsed = parse_druid(response)
print("PARSE LEN", len(parsed))
return parsed
def run_query(self, user, search_query):
response = requests.post("http://broker:8082/druid/v2", json=search_query)
response = orjson.loads(response.text)
print("RESPONSE LEN", len(response))
ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2)
ss = ss.decode()
print(ss)
return response
def filter_blacklisted(self, user, response):
pass
def query_results(
self,
request,
query_params,
size=None,
annotate=True,
custom_query=False,
reverse=False,
dedup=False,
dedup_fields=None,
tags=None,
):
add_bool = []
add_top = []
helpers.add_defaults(query_params)
# Check size
if request.user.is_anonymous:
sizes = settings.MAIN_SIZES_ANON
else:
sizes = settings.MAIN_SIZES
if not size:
size = self.parse_size(query_params, sizes)
if isinstance(size, dict):
return size
# Check index
index = self.parse_index(request.user, query_params)
if isinstance(index, dict):
return index
# Create the search query
search_query = self.parse_query(query_params, tags, size, index, custom_query, add_bool)
if isinstance(search_query, dict):
return search_query
sources = self.parse_source(request.user, query_params)
# TODO
add_top_tmp = {"bool": {"should": []}}
total_count = 0
for source_iter in sources:
add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}})
total_count += 1
total_sources = len(settings.MAIN_SOURCES) + len(
settings.SOURCES_RESTRICTED
)
if not total_count == total_sources:
add_top.append(add_top_tmp)
print("SIZE IS", size)
if add_bool:
self.add_bool(search_query, add_bool)
response = self.query(request.user, search_query)
# print("RESP", response)
# ss = orjson.dumps(list(response), option=orjson.OPT_INDENT_2)
# ss = ss.decode()
# print(ss)
# print("PARSED", results_parsed)
# return results_parsed
context = response
return context
def add_bool(self, search_query, add_bool):
if "filter" in search_query:
if "fields" in search_query["filter"]:
search_query["filter"]["fields"].append({"bool": {"should": add_bool}})
else:
search_query["filter"]["fields"] = [{"bool": {"should": add_bool}}]
else:
search_query["filter"] = {"bool": {"should": add_bool}}