You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
neptune/core/lib/manticore.py

363 lines
12 KiB
Python

import logging
import random
import string
import time
from datetime import datetime
from math import floor, log10
from pprint import pprint
import manticoresearch
import requests
import ujson
from django.conf import settings
from siphashc import siphash
from core import r
from core.lib.processing import annotate_results, filter_blacklisted, parse_results
from core.views import helpers
logger = logging.getLogger(__name__)
def initialise_manticore():
"""
Initialise the Manticore client
"""
configuration = manticoresearch.Configuration(host=settings.MANTICORE_URL)
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.SearchApi(api_client)
return (api_client, api_instance)
api_client, client = initialise_manticore()
def initialise_caching():
hash_key = r.get("cache_hash_key")
if not hash_key:
letters = string.ascii_lowercase
hash_key = "".join(random.choice(letters) for i in range(16))
logger.debug(f"Created new hash key: {hash_key}")
r.set("cache_hash_key", hash_key)
else:
hash_key = hash_key.decode("ascii")
logger.debug(f"Decoded hash key: {hash_key}")
return hash_key
hash_key = initialise_caching()
def construct_query(query, size, index, blank=False):
"""
Accept some query parameters and construct an OpenSearch query.
"""
if not size:
size = 5
query_base = {
"index": index,
"limit": size,
"query": {"bool": {"must": []}},
}
query_string = {
"query_string": query,
}
if not blank:
query_base["query"]["bool"]["must"].append(query_string)
return query_base
def run_query(client, user, search_query):
if settings.MANTICORE_CACHE:
start = time.process_time()
query_normalised = ujson.dumps(search_query, sort_keys=True)
hash = siphash(hash_key, query_normalised)
cache_hit = r.get(f"query_cache.{user.id}.{hash}")
if cache_hit:
response = ujson.loads(cache_hit)
time_took = (time.process_time() - start) * 1000
# Round to 3 significant figures
time_took_rounded = round(
time_took, 3 - int(floor(log10(abs(time_took)))) - 1
)
response["took"] = time_took_rounded
response["cache"] = True
return response
# response = client.search(search_query)
response = requests.post(f"{settings.MANTICORE_URL}/json/search", json=search_query)
response = ujson.loads(response.text)
if "error" in response and len(response.keys()) == 1:
return response
# response = response.to_dict()
#print("RESP", response)
if "took" in response:
if response["took"] is None:
return None
filter_blacklisted(user, response)
# Write cache
if settings.MANTICORE_CACHE:
to_write_cache = ujson.dumps(response)
r.set(f"query_cache.{user.id}.{hash}", to_write_cache)
r.expire(f"query_cache.{user.id}.{hash}", settings.MANTICORE_CACHE_TIMEOUT)
return response
def query_results(
request,
query_params,
size=None,
annotate=True,
custom_query=False,
reverse=False,
dedup=False,
dedup_fields=None,
tags=None,
):
query = None
message = None
message_class = None
add_bool = []
add_top = []
add_top_negative = []
sort = None
query_created = False
source = None
helpers.add_defaults(query_params)
# Check size
if request.user.is_anonymous:
sizes = settings.MANTICORE_MAIN_SIZES_ANON
else:
sizes = settings.MANTICORE_MAIN_SIZES
if not size:
if "size" in query_params:
size = query_params["size"]
if size not in sizes:
message = "Size is not permitted"
message_class = "danger"
return {"message": message, "class": message_class}
size = int(size)
else:
size = 20
# Check index
if "index" in query_params:
index = query_params["index"]
if index == "main":
index = settings.MANTICORE_INDEX_MAIN
else:
if not request.user.has_perm(f"core.index_{index}"):
message = "Not permitted to search by this index"
message_class = "danger"
return {
"message": message,
"class": message_class,
}
if index == "meta":
index = settings.MANTICORE_INDEX_META
elif index == "internal":
index = settings.MANTICORE_INDEX_INT
else:
message = "Index is not valid."
message_class = "danger"
return {
"message": message,
"class": message_class,
}
else:
index = settings.MANTICORE_INDEX_MAIN
# Create the search query
if "query" in query_params:
query = query_params["query"]
search_query = construct_query(query, size, index)
query_created = True
else:
if custom_query:
search_query = custom_query
if tags:
# Get a blank search query
if not query_created:
search_query = construct_query(None, size, index, blank=True)
query_created = True
for tagname, tagvalue in tags.items():
add_bool.append({tagname: tagvalue})
required_any = ["query_full", "query", "tags"]
if not any([field in query_params.keys() for field in required_any]):
if not custom_query:
message = "Empty query!"
message_class = "warning"
return {"message": message, "class": message_class}
# Check for a source
if "source" in query_params:
source = query_params["source"]
if source in settings.MANTICORE_SOURCES_RESTRICTED:
if not request.user.has_perm("core.restricted_sources"):
message = "Access denied"
message_class = "danger"
return {"message": message, "class": message_class}
elif source not in settings.MANTICORE_MAIN_SOURCES:
message = "Invalid source"
message_class = "danger"
return {"message": message, "class": message_class}
if source == "all":
source = None # the next block will populate it
if source:
sources = [source]
else:
sources = list(settings.MANTICORE_MAIN_SOURCES)
if request.user.has_perm("core.restricted_sources"):
for source_iter in settings.MANTICORE_SOURCES_RESTRICTED:
sources.append(source_iter)
add_top_tmp = {"bool": {"should": []}}
total_count = 0
for source_iter in sources:
add_top_tmp["bool"]["should"].append({"equals": {"src": source_iter}})
total_count += 1
total_sources = len(settings.MANTICORE_MAIN_SOURCES) + len(
settings.MANTICORE_SOURCES_RESTRICTED
)
if not total_count == total_sources:
add_top.append(add_top_tmp)
# Date/time range
if set({"from_date", "to_date", "from_time", "to_time"}).issubset(
query_params.keys()
):
from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z"
to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z"
from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ")
to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ")
from_ts = int(from_ts.timestamp())
to_ts = int(to_ts.timestamp())
range_query = {
"range": {
"ts": {
"gt": from_ts,
"lt": to_ts,
}
}
}
add_top.append(range_query)
# Sorting
if "sorting" in query_params:
sorting = query_params["sorting"]
if sorting not in ("asc", "desc", "none"):
message = "Invalid sort"
message_class = "danger"
return {"message": message, "class": message_class}
if sorting in ("asc", "desc"):
sort = [
{
"ts": {
"order": sorting,
}
}
]
# Sentiment handling
if "check_sentiment" in query_params:
if "sentiment_method" not in query_params:
message = "No sentiment method"
message_class = "danger"
return {"message": message, "class": message_class}
if "sentiment" in query_params:
sentiment = query_params["sentiment"]
try:
sentiment = float(sentiment)
except ValueError:
message = "Sentiment is not a float"
message_class = "danger"
return {"message": message, "class": message_class}
sentiment_method = query_params["sentiment_method"]
range_query_compare = {"range": {"sentiment": {}}}
range_query_precise = {
"match": {
"sentiment": None,
}
}
if sentiment_method == "below":
range_query_compare["range"]["sentiment"]["lt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "above":
range_query_compare["range"]["sentiment"]["gt"] = sentiment
add_top.append(range_query_compare)
elif sentiment_method == "exact":
range_query_precise["match"]["sentiment"] = sentiment
add_top.append(range_query_precise)
elif sentiment_method == "nonzero":
range_query_precise["match"]["sentiment"] = 0
add_top_negative.append(range_query_precise)
if add_bool:
# if "bool" not in search_query["query"]:
# search_query["query"]["bool"] = {}
# if "must" not in search_query["query"]["bool"]:
# search_query["query"]["bool"] = {"must": []}
for item in add_bool:
search_query["query"]["bool"]["must"].append({"match": item})
if add_top:
for item in add_top:
search_query["query"]["bool"]["must"].append(item)
if add_top_negative:
for item in add_top_negative:
if "must_not" in search_query["query"]["bool"]:
search_query["query"]["bool"]["must_not"].append(item)
else:
search_query["query"]["bool"]["must_not"] = [item]
if sort:
search_query["sort"] = sort
pprint(search_query)
results = run_query(
client,
request.user, # passed through run_main_query to filter_blacklisted
search_query,
)
if not results:
message = "Error running query"
message_class = "danger"
return {"message": message, "class": message_class}
# results = results.to_dict()
if "error" in results:
message = results["error"]
message_class = "danger"
return {"message": message, "class": message_class}
results_parsed = parse_results(results)
if annotate:
annotate_results(results_parsed)
if "dedup" in query_params:
if query_params["dedup"] == "on":
dedup = True
else:
dedup = False
else:
dedup = False
if reverse:
results_parsed = results_parsed[::-1]
if dedup:
if not dedup_fields:
dedup_fields = ["msg", "nick", "ident", "host", "net", "channel"]
results_parsed = helpers.dedup_list(results_parsed, dedup_fields)
context = {
"object_list": results_parsed,
"card": results["hits"]["total"],
"took": results["took"],
}
if "cache" in results:
context["cache"] = results["cache"]
return context