Implement running scheduled rules and check aggregations
This commit is contained in:
@@ -2,7 +2,6 @@ import random
|
||||
import string
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from math import floor, log10
|
||||
|
||||
import orjson
|
||||
@@ -50,10 +49,6 @@ def dedup_list(data, check_keys):
|
||||
return out
|
||||
|
||||
|
||||
class QueryError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class StorageBackend(ABC):
|
||||
def __init__(self, name):
|
||||
self.log = logs.get_logger(name)
|
||||
@@ -82,66 +77,6 @@ class StorageBackend(ABC):
|
||||
def construct_query(self, **kwargs):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def run_query(self, **kwargs):
|
||||
pass
|
||||
|
||||
def parse_size(self, query_params, sizes):
|
||||
if "size" in query_params:
|
||||
size = query_params["size"]
|
||||
if size not in sizes:
|
||||
message = "Size is not permitted"
|
||||
message_class = "danger"
|
||||
return {"message": message, "class": message_class}
|
||||
size = int(size)
|
||||
else:
|
||||
size = 15
|
||||
|
||||
return size
|
||||
|
||||
def parse_index(self, user, query_params, raise_error=False):
|
||||
if "index" in query_params:
|
||||
index = query_params["index"]
|
||||
if index == "main":
|
||||
index = settings.INDEX_MAIN
|
||||
else:
|
||||
if not user.has_perm(f"core.index_{index}"):
|
||||
message = f"Not permitted to search by this index: {index}"
|
||||
if raise_error:
|
||||
raise QueryError(message)
|
||||
message_class = "danger"
|
||||
return {
|
||||
"message": message,
|
||||
"class": message_class,
|
||||
}
|
||||
if index == "meta":
|
||||
index = settings.INDEX_META
|
||||
elif index == "internal":
|
||||
index = settings.INDEX_INT
|
||||
elif index == "restricted":
|
||||
if not user.has_perm("core.restricted_sources"):
|
||||
message = f"Not permitted to search by this index: {index}"
|
||||
if raise_error:
|
||||
raise QueryError(message)
|
||||
message_class = "danger"
|
||||
return {
|
||||
"message": message,
|
||||
"class": message_class,
|
||||
}
|
||||
index = settings.INDEX_RESTRICTED
|
||||
else:
|
||||
message = f"Index is not valid: {index}"
|
||||
if raise_error:
|
||||
raise QueryError(message)
|
||||
message_class = "danger"
|
||||
return {
|
||||
"message": message,
|
||||
"class": message_class,
|
||||
}
|
||||
else:
|
||||
index = settings.INDEX_MAIN
|
||||
return index
|
||||
|
||||
def parse_query(self, query_params, tags, size, custom_query, add_bool, **kwargs):
|
||||
query_created = False
|
||||
if "query" in query_params:
|
||||
@@ -177,85 +112,9 @@ class StorageBackend(ABC):
|
||||
message_class = "warning"
|
||||
return {"message": message, "class": message_class}
|
||||
|
||||
def parse_source(self, user, query_params, raise_error=False):
|
||||
source = None
|
||||
if "source" in query_params:
|
||||
source = query_params["source"]
|
||||
|
||||
if source in settings.SOURCES_RESTRICTED:
|
||||
if not user.has_perm("core.restricted_sources"):
|
||||
message = f"Access denied: {source}"
|
||||
if raise_error:
|
||||
raise QueryError(message)
|
||||
message_class = "danger"
|
||||
return {"message": message, "class": message_class}
|
||||
elif source not in settings.MAIN_SOURCES:
|
||||
message = f"Invalid source: {source}"
|
||||
if raise_error:
|
||||
raise QueryError(message)
|
||||
message_class = "danger"
|
||||
return {"message": message, "class": message_class}
|
||||
|
||||
if source == "all":
|
||||
source = None # the next block will populate it
|
||||
|
||||
if source:
|
||||
sources = [source]
|
||||
else:
|
||||
sources = list(settings.MAIN_SOURCES)
|
||||
if user.has_perm("core.restricted_sources"):
|
||||
for source_iter in settings.SOURCES_RESTRICTED:
|
||||
sources.append(source_iter)
|
||||
|
||||
if "all" in sources:
|
||||
sources.remove("all")
|
||||
|
||||
return sources
|
||||
|
||||
def parse_sort(self, query_params):
|
||||
sort = None
|
||||
if "sorting" in query_params:
|
||||
sorting = query_params["sorting"]
|
||||
if sorting not in ("asc", "desc", "none"):
|
||||
message = "Invalid sort"
|
||||
message_class = "danger"
|
||||
return {"message": message, "class": message_class}
|
||||
if sorting == "asc":
|
||||
sort = "ascending"
|
||||
elif sorting == "desc":
|
||||
sort = "descending"
|
||||
return sort
|
||||
|
||||
def parse_date_time(self, query_params):
|
||||
if set({"from_date", "to_date", "from_time", "to_time"}).issubset(
|
||||
query_params.keys()
|
||||
):
|
||||
from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z"
|
||||
to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z"
|
||||
from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ")
|
||||
to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ")
|
||||
|
||||
return (from_ts, to_ts)
|
||||
return (None, None)
|
||||
|
||||
def parse_sentiment(self, query_params):
|
||||
sentiment = None
|
||||
if "check_sentiment" in query_params:
|
||||
if "sentiment_method" not in query_params:
|
||||
message = "No sentiment method"
|
||||
message_class = "danger"
|
||||
return {"message": message, "class": message_class}
|
||||
if "sentiment" in query_params:
|
||||
sentiment = query_params["sentiment"]
|
||||
try:
|
||||
sentiment = float(sentiment)
|
||||
except ValueError:
|
||||
message = "Sentiment is not a float"
|
||||
message_class = "danger"
|
||||
return {"message": message, "class": message_class}
|
||||
sentiment_method = query_params["sentiment_method"]
|
||||
|
||||
return (sentiment_method, sentiment)
|
||||
@abstractmethod
|
||||
def run_query(self, **kwargs):
|
||||
pass
|
||||
|
||||
def filter_blacklisted(self, user, response):
|
||||
"""
|
||||
|
||||
@@ -6,6 +6,14 @@ from django.conf import settings
|
||||
|
||||
from core.db import StorageBackend, add_defaults
|
||||
from core.db.processing import parse_druid
|
||||
from core.lib.parsing import (
|
||||
parse_date_time,
|
||||
parse_index,
|
||||
parse_sentiment,
|
||||
parse_size,
|
||||
parse_sort,
|
||||
parse_source,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -155,12 +163,12 @@ class DruidBackend(StorageBackend):
|
||||
else:
|
||||
sizes = settings.MAIN_SIZES
|
||||
if not size:
|
||||
size = self.parse_size(query_params, sizes)
|
||||
size = parse_size(query_params, sizes)
|
||||
if isinstance(size, dict):
|
||||
return size
|
||||
|
||||
# I - Index
|
||||
index = self.parse_index(request.user, query_params)
|
||||
index = parse_index(request.user, query_params)
|
||||
if isinstance(index, dict):
|
||||
return index
|
||||
|
||||
@@ -173,7 +181,7 @@ class DruidBackend(StorageBackend):
|
||||
return search_query
|
||||
|
||||
# S - Sources
|
||||
sources = self.parse_source(request.user, query_params)
|
||||
sources = parse_source(request.user, query_params)
|
||||
if isinstance(sources, dict):
|
||||
return sources
|
||||
total_count = len(sources)
|
||||
@@ -182,20 +190,20 @@ class DruidBackend(StorageBackend):
|
||||
add_in["src"] = sources
|
||||
|
||||
# R - Ranges
|
||||
from_ts, to_ts = self.parse_date_time(query_params)
|
||||
from_ts, to_ts = parse_date_time(query_params)
|
||||
if from_ts:
|
||||
addendum = f"{from_ts}/{to_ts}"
|
||||
search_query["intervals"] = [addendum]
|
||||
|
||||
# S - Sort
|
||||
sort = self.parse_sort(query_params)
|
||||
sort = parse_sort(query_params)
|
||||
if isinstance(sort, dict):
|
||||
return sort
|
||||
if sort:
|
||||
search_query["order"] = sort
|
||||
|
||||
# S - Sentiment
|
||||
sentiment_r = self.parse_sentiment(query_params)
|
||||
sentiment_r = parse_sentiment(query_params)
|
||||
if isinstance(sentiment_r, dict):
|
||||
return sentiment_r
|
||||
if sentiment_r:
|
||||
|
||||
@@ -10,6 +10,15 @@ from core.db import StorageBackend, add_defaults
|
||||
# from json import dumps
|
||||
# pp = lambda x: print(dumps(x, indent=2))
|
||||
from core.db.processing import parse_results
|
||||
from core.lib.parsing import (
|
||||
QueryError,
|
||||
parse_date_time,
|
||||
parse_index,
|
||||
parse_sentiment,
|
||||
parse_size,
|
||||
parse_sort,
|
||||
parse_source,
|
||||
)
|
||||
|
||||
|
||||
class ElasticsearchBackend(StorageBackend):
|
||||
@@ -126,14 +135,16 @@ class ElasticsearchBackend(StorageBackend):
|
||||
)
|
||||
return query
|
||||
|
||||
def construct_query(self, query, size, blank=False):
|
||||
def construct_query(self, query, size=None, blank=False):
|
||||
"""
|
||||
Accept some query parameters and construct an Elasticsearch query.
|
||||
"""
|
||||
query_base = {
|
||||
"size": size,
|
||||
# "size": size,
|
||||
"query": {"bool": {"must": []}},
|
||||
}
|
||||
if size:
|
||||
query_base["size"] = size
|
||||
query_string = {
|
||||
"query_string": {
|
||||
"query": query,
|
||||
@@ -163,8 +174,8 @@ class ElasticsearchBackend(StorageBackend):
|
||||
query_base["query"]["bool"]["must"].append(query_string)
|
||||
return query_base
|
||||
|
||||
def parse(self, response):
|
||||
parsed = parse_results(response)
|
||||
def parse(self, response, **kwargs):
|
||||
parsed = parse_results(response, **kwargs)
|
||||
return parsed
|
||||
|
||||
def run_query(self, user, search_query, **kwargs):
|
||||
@@ -186,6 +197,127 @@ class ElasticsearchBackend(StorageBackend):
|
||||
return err
|
||||
return response
|
||||
|
||||
async def async_run_query(self, user, search_query, **kwargs):
|
||||
"""
|
||||
Low level helper to run an ES query.
|
||||
Accept a user to pass it to the filter, so we can
|
||||
avoid filtering for superusers.
|
||||
Accept fields and size, for the fields we want to match and the
|
||||
number of results to return.
|
||||
"""
|
||||
index = kwargs.get("index")
|
||||
try:
|
||||
response = self.client.search(body=search_query, index=index)
|
||||
except RequestError as err:
|
||||
print("Elasticsearch error", err)
|
||||
return err
|
||||
except NotFoundError as err:
|
||||
print("Elasticsearch error", err)
|
||||
return err
|
||||
return response
|
||||
|
||||
async def schedule_query_results(self, rule_object):
|
||||
"""
|
||||
Helper to run a scheduled query with reduced functionality and async.
|
||||
"""
|
||||
|
||||
data = rule_object.parsed
|
||||
|
||||
if "tags" in data:
|
||||
tags = data["tags"]
|
||||
else:
|
||||
tags = []
|
||||
|
||||
if "query" in data:
|
||||
query = data["query"][0]
|
||||
data["query"] = query
|
||||
|
||||
result_map = {}
|
||||
|
||||
add_bool = []
|
||||
add_top = []
|
||||
if "source" in data:
|
||||
total_count = len(data["source"])
|
||||
total_sources = len(settings.MAIN_SOURCES) + len(
|
||||
settings.SOURCES_RESTRICTED
|
||||
)
|
||||
if total_count != total_sources:
|
||||
add_top_tmp = {"bool": {"should": []}}
|
||||
for source_iter in data["source"]:
|
||||
add_top_tmp["bool"]["should"].append(
|
||||
{"match_phrase": {"src": source_iter}}
|
||||
)
|
||||
add_top.append(add_top_tmp)
|
||||
for field, values in data.items():
|
||||
if field not in ["source", "index", "tags", "query", "sentiment"]:
|
||||
for value in values:
|
||||
add_top.append({"match": {field: value}})
|
||||
search_query = self.parse_query(data, tags, None, False, add_bool)
|
||||
self.add_bool(search_query, add_bool)
|
||||
self.add_top(search_query, add_top)
|
||||
if "sentiment" in data:
|
||||
search_query["aggs"] = {
|
||||
"avg_sentiment": {
|
||||
"avg": {"field": "sentiment"},
|
||||
}
|
||||
}
|
||||
for index in data["index"]:
|
||||
|
||||
if "message" in search_query:
|
||||
self.log.error(f"Error parsing query: {search_query['message']}")
|
||||
continue
|
||||
response = await self.async_run_query(
|
||||
rule_object.user,
|
||||
search_query,
|
||||
index=index,
|
||||
)
|
||||
if isinstance(response, Exception):
|
||||
error = response.info["error"]["root_cause"][0]["reason"]
|
||||
self.log.error(f"Error running scheduled search: {error}")
|
||||
raise QueryError(error)
|
||||
if len(response["hits"]["hits"]) == 0:
|
||||
# No results, skip
|
||||
continue
|
||||
aggs, response = self.parse(response, aggs=True)
|
||||
if "message" in response:
|
||||
self.log.error(f"Error running scheduled search: {response['message']}")
|
||||
continue
|
||||
result_map[index] = (aggs, response)
|
||||
|
||||
# Average aggregation check
|
||||
# Could probably do this in elasticsearch
|
||||
for index, (aggs, result) in result_map.items():
|
||||
# Default to true, if no aggs are found, we still want to match
|
||||
match = True
|
||||
for agg_name, (operator, number) in rule_object.aggs.items():
|
||||
if agg_name in aggs:
|
||||
agg_value = aggs[agg_name]["value"]
|
||||
|
||||
# TODO: simplify this, match is default to True
|
||||
if operator == ">":
|
||||
if agg_value > number:
|
||||
match = True
|
||||
else:
|
||||
match = False
|
||||
elif operator == "<":
|
||||
if agg_value < number:
|
||||
match = True
|
||||
else:
|
||||
match = False
|
||||
elif operator == "=":
|
||||
if agg_value == number:
|
||||
match = True
|
||||
else:
|
||||
match = False
|
||||
else:
|
||||
match = False
|
||||
else:
|
||||
# No aggregation found, but it is required
|
||||
match = False
|
||||
result_map[index][0][agg_name]["match"] = match
|
||||
|
||||
return result_map
|
||||
|
||||
def query_results(
|
||||
self,
|
||||
request,
|
||||
@@ -224,12 +356,12 @@ class ElasticsearchBackend(StorageBackend):
|
||||
else:
|
||||
sizes = settings.MAIN_SIZES
|
||||
if not size:
|
||||
size = self.parse_size(query_params, sizes)
|
||||
size = parse_size(query_params, sizes)
|
||||
if isinstance(size, dict):
|
||||
return size
|
||||
|
||||
# I - Index
|
||||
index = self.parse_index(request.user, query_params)
|
||||
index = parse_index(request.user, query_params)
|
||||
if isinstance(index, dict):
|
||||
return index
|
||||
|
||||
@@ -242,7 +374,7 @@ class ElasticsearchBackend(StorageBackend):
|
||||
return search_query
|
||||
|
||||
# S - Sources
|
||||
sources = self.parse_source(request.user, query_params)
|
||||
sources = parse_source(request.user, query_params)
|
||||
if isinstance(sources, dict):
|
||||
return sources
|
||||
total_count = len(sources)
|
||||
@@ -257,7 +389,7 @@ class ElasticsearchBackend(StorageBackend):
|
||||
|
||||
# R - Ranges
|
||||
# date_query = False
|
||||
from_ts, to_ts = self.parse_date_time(query_params)
|
||||
from_ts, to_ts = parse_date_time(query_params)
|
||||
if from_ts:
|
||||
range_query = {
|
||||
"range": {
|
||||
@@ -270,7 +402,7 @@ class ElasticsearchBackend(StorageBackend):
|
||||
add_top.append(range_query)
|
||||
|
||||
# S - Sort
|
||||
sort = self.parse_sort(query_params)
|
||||
sort = parse_sort(query_params)
|
||||
if isinstance(sort, dict):
|
||||
return sort
|
||||
if sort:
|
||||
@@ -286,7 +418,7 @@ class ElasticsearchBackend(StorageBackend):
|
||||
search_query["sort"] = sorting
|
||||
|
||||
# S - Sentiment
|
||||
sentiment_r = self.parse_sentiment(query_params)
|
||||
sentiment_r = parse_sentiment(query_params)
|
||||
if isinstance(sentiment_r, dict):
|
||||
return sentiment_r
|
||||
if sentiment_r:
|
||||
|
||||
@@ -58,7 +58,7 @@ def annotate_results(results):
|
||||
item["num_chans"] = num_chans[item["nick"]]
|
||||
|
||||
|
||||
def parse_results(results):
|
||||
def parse_results(results, aggs):
|
||||
results_parsed = []
|
||||
stringify = ["host", "channel"]
|
||||
if "hits" in results.keys():
|
||||
@@ -110,6 +110,14 @@ def parse_results(results):
|
||||
else:
|
||||
element["time"] = time
|
||||
results_parsed.append(element)
|
||||
if aggs:
|
||||
aggregations = {}
|
||||
if "aggregations" in results:
|
||||
for field in ["avg_sentiment"]: # Add other number fields here
|
||||
if field in results["aggregations"]:
|
||||
aggregations[field] = results["aggregations"][field]
|
||||
return (aggregations, results_parsed)
|
||||
|
||||
return results_parsed
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user