From 6bfa0aa73b5e395b035728ba19df95acaaf61dd8 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sun, 15 Jan 2023 17:59:12 +0000 Subject: [PATCH] Implement running scheduled rules and check aggregations --- core/__init__.py | 5 + core/db/__init__.py | 147 +----------- core/db/druid.py | 20 +- core/db/elastic.py | 152 +++++++++++- core/db/processing.py | 10 +- core/forms.py | 7 +- core/lib/parsing.py | 149 ++++++++++++ core/lib/rules.py | 218 ++++++++++++------ core/management/commands/processing.py | 65 +++++- core/management/commands/scheduling.py | 34 +-- ...options_notificationrule_match_and_more.py | 27 +++ .../0019_alter_notificationrule_match.py | 18 ++ core/models.py | 2 + core/views/notifications.py | 2 +- requirements.txt | 2 +- 15 files changed, 600 insertions(+), 258 deletions(-) create mode 100644 core/lib/parsing.py create mode 100644 core/migrations/0018_alter_perms_options_notificationrule_match_and_more.py create mode 100644 core/migrations/0019_alter_notificationrule_match.py diff --git a/core/__init__.py b/core/__init__.py index 9302e9b..ce498ab 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,7 +1,12 @@ +import os + import stripe from django.conf import settings from redis import StrictRedis +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" + + r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0) if settings.STRIPE_TEST: diff --git a/core/db/__init__.py b/core/db/__init__.py index 2ad08cb..fe1fa9c 100644 --- a/core/db/__init__.py +++ b/core/db/__init__.py @@ -2,7 +2,6 @@ import random import string import time from abc import ABC, abstractmethod -from datetime import datetime from math import floor, log10 import orjson @@ -50,10 +49,6 @@ def dedup_list(data, check_keys): return out -class QueryError(Exception): - pass - - class StorageBackend(ABC): def __init__(self, name): self.log = logs.get_logger(name) @@ -82,66 +77,6 @@ class StorageBackend(ABC): def construct_query(self, **kwargs): pass - @abstractmethod - def run_query(self, **kwargs): - pass - - def parse_size(self, query_params, sizes): - if "size" in query_params: - size = query_params["size"] - if size not in sizes: - message = "Size is not permitted" - message_class = "danger" - return {"message": message, "class": message_class} - size = int(size) - else: - size = 15 - - return size - - def parse_index(self, user, query_params, raise_error=False): - if "index" in query_params: - index = query_params["index"] - if index == "main": - index = settings.INDEX_MAIN - else: - if not user.has_perm(f"core.index_{index}"): - message = f"Not permitted to search by this index: {index}" - if raise_error: - raise QueryError(message) - message_class = "danger" - return { - "message": message, - "class": message_class, - } - if index == "meta": - index = settings.INDEX_META - elif index == "internal": - index = settings.INDEX_INT - elif index == "restricted": - if not user.has_perm("core.restricted_sources"): - message = f"Not permitted to search by this index: {index}" - if raise_error: - raise QueryError(message) - message_class = "danger" - return { - "message": message, - "class": message_class, - } - index = settings.INDEX_RESTRICTED - else: - message = f"Index is not valid: {index}" - if raise_error: - raise QueryError(message) - message_class = "danger" - return { - "message": message, - "class": message_class, - } - else: - index = settings.INDEX_MAIN - return index - def parse_query(self, query_params, tags, size, custom_query, add_bool, **kwargs): query_created = False if "query" in query_params: @@ -177,85 +112,9 @@ class StorageBackend(ABC): message_class = "warning" return {"message": message, "class": message_class} - def parse_source(self, user, query_params, raise_error=False): - source = None - if "source" in query_params: - source = query_params["source"] - - if source in settings.SOURCES_RESTRICTED: - if not user.has_perm("core.restricted_sources"): - message = f"Access denied: {source}" - if raise_error: - raise QueryError(message) - message_class = "danger" - return {"message": message, "class": message_class} - elif source not in settings.MAIN_SOURCES: - message = f"Invalid source: {source}" - if raise_error: - raise QueryError(message) - message_class = "danger" - return {"message": message, "class": message_class} - - if source == "all": - source = None # the next block will populate it - - if source: - sources = [source] - else: - sources = list(settings.MAIN_SOURCES) - if user.has_perm("core.restricted_sources"): - for source_iter in settings.SOURCES_RESTRICTED: - sources.append(source_iter) - - if "all" in sources: - sources.remove("all") - - return sources - - def parse_sort(self, query_params): - sort = None - if "sorting" in query_params: - sorting = query_params["sorting"] - if sorting not in ("asc", "desc", "none"): - message = "Invalid sort" - message_class = "danger" - return {"message": message, "class": message_class} - if sorting == "asc": - sort = "ascending" - elif sorting == "desc": - sort = "descending" - return sort - - def parse_date_time(self, query_params): - if set({"from_date", "to_date", "from_time", "to_time"}).issubset( - query_params.keys() - ): - from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" - to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" - from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") - to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") - - return (from_ts, to_ts) - return (None, None) - - def parse_sentiment(self, query_params): - sentiment = None - if "check_sentiment" in query_params: - if "sentiment_method" not in query_params: - message = "No sentiment method" - message_class = "danger" - return {"message": message, "class": message_class} - if "sentiment" in query_params: - sentiment = query_params["sentiment"] - try: - sentiment = float(sentiment) - except ValueError: - message = "Sentiment is not a float" - message_class = "danger" - return {"message": message, "class": message_class} - sentiment_method = query_params["sentiment_method"] - - return (sentiment_method, sentiment) + @abstractmethod + def run_query(self, **kwargs): + pass def filter_blacklisted(self, user, response): """ diff --git a/core/db/druid.py b/core/db/druid.py index 2b7c797..0a23752 100644 --- a/core/db/druid.py +++ b/core/db/druid.py @@ -6,6 +6,14 @@ from django.conf import settings from core.db import StorageBackend, add_defaults from core.db.processing import parse_druid +from core.lib.parsing import ( + parse_date_time, + parse_index, + parse_sentiment, + parse_size, + parse_sort, + parse_source, +) logger = logging.getLogger(__name__) @@ -155,12 +163,12 @@ class DruidBackend(StorageBackend): else: sizes = settings.MAIN_SIZES if not size: - size = self.parse_size(query_params, sizes) + size = parse_size(query_params, sizes) if isinstance(size, dict): return size # I - Index - index = self.parse_index(request.user, query_params) + index = parse_index(request.user, query_params) if isinstance(index, dict): return index @@ -173,7 +181,7 @@ class DruidBackend(StorageBackend): return search_query # S - Sources - sources = self.parse_source(request.user, query_params) + sources = parse_source(request.user, query_params) if isinstance(sources, dict): return sources total_count = len(sources) @@ -182,20 +190,20 @@ class DruidBackend(StorageBackend): add_in["src"] = sources # R - Ranges - from_ts, to_ts = self.parse_date_time(query_params) + from_ts, to_ts = parse_date_time(query_params) if from_ts: addendum = f"{from_ts}/{to_ts}" search_query["intervals"] = [addendum] # S - Sort - sort = self.parse_sort(query_params) + sort = parse_sort(query_params) if isinstance(sort, dict): return sort if sort: search_query["order"] = sort # S - Sentiment - sentiment_r = self.parse_sentiment(query_params) + sentiment_r = parse_sentiment(query_params) if isinstance(sentiment_r, dict): return sentiment_r if sentiment_r: diff --git a/core/db/elastic.py b/core/db/elastic.py index 45165d0..7e5be08 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -10,6 +10,15 @@ from core.db import StorageBackend, add_defaults # from json import dumps # pp = lambda x: print(dumps(x, indent=2)) from core.db.processing import parse_results +from core.lib.parsing import ( + QueryError, + parse_date_time, + parse_index, + parse_sentiment, + parse_size, + parse_sort, + parse_source, +) class ElasticsearchBackend(StorageBackend): @@ -126,14 +135,16 @@ class ElasticsearchBackend(StorageBackend): ) return query - def construct_query(self, query, size, blank=False): + def construct_query(self, query, size=None, blank=False): """ Accept some query parameters and construct an Elasticsearch query. """ query_base = { - "size": size, + # "size": size, "query": {"bool": {"must": []}}, } + if size: + query_base["size"] = size query_string = { "query_string": { "query": query, @@ -163,8 +174,8 @@ class ElasticsearchBackend(StorageBackend): query_base["query"]["bool"]["must"].append(query_string) return query_base - def parse(self, response): - parsed = parse_results(response) + def parse(self, response, **kwargs): + parsed = parse_results(response, **kwargs) return parsed def run_query(self, user, search_query, **kwargs): @@ -186,6 +197,127 @@ class ElasticsearchBackend(StorageBackend): return err return response + async def async_run_query(self, user, search_query, **kwargs): + """ + Low level helper to run an ES query. + Accept a user to pass it to the filter, so we can + avoid filtering for superusers. + Accept fields and size, for the fields we want to match and the + number of results to return. + """ + index = kwargs.get("index") + try: + response = self.client.search(body=search_query, index=index) + except RequestError as err: + print("Elasticsearch error", err) + return err + except NotFoundError as err: + print("Elasticsearch error", err) + return err + return response + + async def schedule_query_results(self, rule_object): + """ + Helper to run a scheduled query with reduced functionality and async. + """ + + data = rule_object.parsed + + if "tags" in data: + tags = data["tags"] + else: + tags = [] + + if "query" in data: + query = data["query"][0] + data["query"] = query + + result_map = {} + + add_bool = [] + add_top = [] + if "source" in data: + total_count = len(data["source"]) + total_sources = len(settings.MAIN_SOURCES) + len( + settings.SOURCES_RESTRICTED + ) + if total_count != total_sources: + add_top_tmp = {"bool": {"should": []}} + for source_iter in data["source"]: + add_top_tmp["bool"]["should"].append( + {"match_phrase": {"src": source_iter}} + ) + add_top.append(add_top_tmp) + for field, values in data.items(): + if field not in ["source", "index", "tags", "query", "sentiment"]: + for value in values: + add_top.append({"match": {field: value}}) + search_query = self.parse_query(data, tags, None, False, add_bool) + self.add_bool(search_query, add_bool) + self.add_top(search_query, add_top) + if "sentiment" in data: + search_query["aggs"] = { + "avg_sentiment": { + "avg": {"field": "sentiment"}, + } + } + for index in data["index"]: + + if "message" in search_query: + self.log.error(f"Error parsing query: {search_query['message']}") + continue + response = await self.async_run_query( + rule_object.user, + search_query, + index=index, + ) + if isinstance(response, Exception): + error = response.info["error"]["root_cause"][0]["reason"] + self.log.error(f"Error running scheduled search: {error}") + raise QueryError(error) + if len(response["hits"]["hits"]) == 0: + # No results, skip + continue + aggs, response = self.parse(response, aggs=True) + if "message" in response: + self.log.error(f"Error running scheduled search: {response['message']}") + continue + result_map[index] = (aggs, response) + + # Average aggregation check + # Could probably do this in elasticsearch + for index, (aggs, result) in result_map.items(): + # Default to true, if no aggs are found, we still want to match + match = True + for agg_name, (operator, number) in rule_object.aggs.items(): + if agg_name in aggs: + agg_value = aggs[agg_name]["value"] + + # TODO: simplify this, match is default to True + if operator == ">": + if agg_value > number: + match = True + else: + match = False + elif operator == "<": + if agg_value < number: + match = True + else: + match = False + elif operator == "=": + if agg_value == number: + match = True + else: + match = False + else: + match = False + else: + # No aggregation found, but it is required + match = False + result_map[index][0][agg_name]["match"] = match + + return result_map + def query_results( self, request, @@ -224,12 +356,12 @@ class ElasticsearchBackend(StorageBackend): else: sizes = settings.MAIN_SIZES if not size: - size = self.parse_size(query_params, sizes) + size = parse_size(query_params, sizes) if isinstance(size, dict): return size # I - Index - index = self.parse_index(request.user, query_params) + index = parse_index(request.user, query_params) if isinstance(index, dict): return index @@ -242,7 +374,7 @@ class ElasticsearchBackend(StorageBackend): return search_query # S - Sources - sources = self.parse_source(request.user, query_params) + sources = parse_source(request.user, query_params) if isinstance(sources, dict): return sources total_count = len(sources) @@ -257,7 +389,7 @@ class ElasticsearchBackend(StorageBackend): # R - Ranges # date_query = False - from_ts, to_ts = self.parse_date_time(query_params) + from_ts, to_ts = parse_date_time(query_params) if from_ts: range_query = { "range": { @@ -270,7 +402,7 @@ class ElasticsearchBackend(StorageBackend): add_top.append(range_query) # S - Sort - sort = self.parse_sort(query_params) + sort = parse_sort(query_params) if isinstance(sort, dict): return sort if sort: @@ -286,7 +418,7 @@ class ElasticsearchBackend(StorageBackend): search_query["sort"] = sorting # S - Sentiment - sentiment_r = self.parse_sentiment(query_params) + sentiment_r = parse_sentiment(query_params) if isinstance(sentiment_r, dict): return sentiment_r if sentiment_r: diff --git a/core/db/processing.py b/core/db/processing.py index aa9aa55..25a36b0 100644 --- a/core/db/processing.py +++ b/core/db/processing.py @@ -58,7 +58,7 @@ def annotate_results(results): item["num_chans"] = num_chans[item["nick"]] -def parse_results(results): +def parse_results(results, aggs): results_parsed = [] stringify = ["host", "channel"] if "hits" in results.keys(): @@ -110,6 +110,14 @@ def parse_results(results): else: element["time"] = time results_parsed.append(element) + if aggs: + aggregations = {} + if "aggregations" in results: + for field in ["avg_sentiment"]: # Add other number fields here + if field in results["aggregations"]: + aggregations[field] = results["aggregations"][field] + return (aggregations, results_parsed) + return results_parsed diff --git a/core/forms.py b/core/forms.py index 8a82032..1e4dafb 100644 --- a/core/forms.py +++ b/core/forms.py @@ -3,7 +3,8 @@ from django.contrib.auth.forms import UserCreationForm from django.core.exceptions import FieldDoesNotExist from django.forms import ModelForm -from core.db import QueryError +from core.db.storage import db +from core.lib.parsing import QueryError from core.lib.rules import NotificationRuleData, RuleParseError from .models import NotificationRule, NotificationSettings, User @@ -107,7 +108,9 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): def clean(self): cleaned_data = super(NotificationRuleForm, self).clean() try: - parsed_data = NotificationRuleData(self.request.user, cleaned_data) + # Passing db to avoid circular import + parsed_data = NotificationRuleData(self.request.user, cleaned_data, db=db) + parsed_data.test_schedule() except RuleParseError as e: self.add_error(e.field, f"Parsing error: {e}") return diff --git a/core/lib/parsing.py b/core/lib/parsing.py new file mode 100644 index 0000000..13efda3 --- /dev/null +++ b/core/lib/parsing.py @@ -0,0 +1,149 @@ +from datetime import datetime + +from django.conf import settings + + +class QueryError(Exception): + pass + + +def parse_size(query_params, sizes): + if "size" in query_params: + size = query_params["size"] + if size not in sizes: + message = "Size is not permitted" + message_class = "danger" + return {"message": message, "class": message_class} + size = int(size) + else: + size = 15 + + return size + + +def parse_index(user, query_params, raise_error=False): + if "index" in query_params: + index = query_params["index"] + if index == "main": + index = settings.INDEX_MAIN + else: + if not user.has_perm(f"core.index_{index}"): + message = f"Not permitted to search by this index: {index}" + if raise_error: + raise QueryError(message) + message_class = "danger" + return { + "message": message, + "class": message_class, + } + if index == "meta": + index = settings.INDEX_META + elif index == "internal": + index = settings.INDEX_INT + elif index == "restricted": + if not user.has_perm("core.restricted_sources"): + message = f"Not permitted to search by this index: {index}" + if raise_error: + raise QueryError(message) + message_class = "danger" + return { + "message": message, + "class": message_class, + } + index = settings.INDEX_RESTRICTED + else: + message = f"Index is not valid: {index}" + if raise_error: + raise QueryError(message) + message_class = "danger" + return { + "message": message, + "class": message_class, + } + else: + index = settings.INDEX_MAIN + return index + + +def parse_source(user, query_params, raise_error=False): + source = None + if "source" in query_params: + source = query_params["source"] + + if source in settings.SOURCES_RESTRICTED: + if not user.has_perm("core.restricted_sources"): + message = f"Access denied: {source}" + if raise_error: + raise QueryError(message) + message_class = "danger" + return {"message": message, "class": message_class} + elif source not in settings.MAIN_SOURCES: + message = f"Invalid source: {source}" + if raise_error: + raise QueryError(message) + message_class = "danger" + return {"message": message, "class": message_class} + + if source == "all": + source = None # the next block will populate it + + if source: + sources = [source] + else: + sources = list(settings.MAIN_SOURCES) + if user.has_perm("core.restricted_sources"): + for source_iter in settings.SOURCES_RESTRICTED: + sources.append(source_iter) + + if "all" in sources: + sources.remove("all") + + return sources + + +def parse_sort(query_params): + sort = None + if "sorting" in query_params: + sorting = query_params["sorting"] + if sorting not in ("asc", "desc", "none"): + message = "Invalid sort" + message_class = "danger" + return {"message": message, "class": message_class} + if sorting == "asc": + sort = "ascending" + elif sorting == "desc": + sort = "descending" + return sort + + +def parse_date_time(query_params): + if set({"from_date", "to_date", "from_time", "to_time"}).issubset( + query_params.keys() + ): + from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" + to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" + from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") + to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") + + return (from_ts, to_ts) + return (None, None) + + +def parse_sentiment(query_params): + sentiment = None + if "check_sentiment" in query_params: + if "sentiment_method" not in query_params: + message = "No sentiment method" + message_class = "danger" + return {"message": message, "class": message_class} + if "sentiment" in query_params: + sentiment = query_params["sentiment"] + try: + sentiment = float(sentiment) + except ValueError: + message = "Sentiment is not a float" + message_class = "danger" + return {"message": message, "class": message_class} + sentiment_method = query_params["sentiment_method"] + + return (sentiment_method, sentiment) diff --git a/core/lib/rules.py b/core/lib/rules.py index c554a28..6e2909b 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -2,16 +2,16 @@ from yaml import dump, load from yaml.parser import ParserError from yaml.scanner import ScannerError -from core.db.storage import db -from core.models import NotificationRule - try: from yaml import CDumper as Dumper from yaml import CLoader as Loader except ImportError: from yaml import Loader, Dumper +from asgiref.sync import async_to_sync + from core.lib.notify import sendmsg +from core.lib.parsing import parse_index, parse_source from core.util import logs log = logs.get_logger("rules") @@ -46,81 +46,150 @@ def rule_matched(rule, message, matched): sendmsg(rule.user, notify_message, **cast) -def process_rules(data): - all_rules = NotificationRule.objects.filter(enabled=True) - - for index, index_messages in data.items(): - for message in index_messages: - for rule in all_rules: - parsed_rule = rule.parse() - matched = {} - if "index" not in parsed_rule: - continue - if "source" not in parsed_rule: - continue - rule_index = parsed_rule["index"] - rule_source = parsed_rule["source"] - if not type(rule_index) == list: - rule_index = [rule_index] - if not type(rule_source) == list: - rule_source = [rule_source] - if index not in rule_index: - continue - if message["src"] not in rule_source: - continue - - matched["index"] = index - matched["source"] = message["src"] - - rule_field_length = len(parsed_rule.keys()) - matched_field_number = 0 - for field, value in parsed_rule.items(): - if not type(value) == list: - value = [value] - if field == "src": - continue - if field == "tokens": - for token in value: - if "tokens" in message: - if token in message["tokens"]: - matched_field_number += 1 - matched[field] = token - # Break out of the token matching loop - break - # Continue to next field - continue - - # Allow partial matches for msg - if field == "msg": - for msg in value: - if "msg" in message: - if msg.lower() in message["msg"].lower(): - matched_field_number += 1 - matched[field] = msg - # Break out of the msg matching loop - break - # Continue to next field - continue - if field in message and message[field] in value: - matched_field_number += 1 - matched[field] = message[field] - if matched_field_number == rule_field_length - 2: - rule_matched(rule, message, matched) - - class NotificationRuleData(object): - def __init__(self, user, cleaned_data): + def __init__(self, user, cleaned_data, db): self.user = user + self.object = None + + # We are running live + if not isinstance(cleaned_data, dict): + self.object = cleaned_data + cleaned_data = cleaned_data.__dict__ + self.cleaned_data = cleaned_data + self.db = db self.data = self.cleaned_data.get("data") self.parsed = None + self.aggs = {} self.validate_user_permissions() self.parse_data() + self.ensure_list() self.validate_permissions() + self.validate_schedule_fields() self.validate_time_fields() + def store_match(self, index, match): + """ + Store a match result. + """ + if self.object.match is None: + self.object.match = {} + if not isinstance(self.object.match, dict): + self.object.match = {} + + self.object.match[index] = match + self.object.save() + log.debug(f"Stored match: {index} - {match}") + + async def run_schedule(self): + """ + Run the schedule query. + """ + if self.db: + response = await self.db.schedule_query_results(self) + for index, (aggs, results) in response.items(): + if not results: + self.store_match(index, False) + + aggs_for_index = [] + for agg_name in self.aggs.keys(): + if agg_name in aggs: + if "match" in aggs[agg_name]: + aggs_for_index.append(aggs[agg_name]["match"]) + + # All required aggs are present + if len(aggs_for_index) == len(self.aggs.keys()): + if all(aggs_for_index): + self.store_match(index, True) + continue + self.store_match(index, False) + + def test_schedule(self): + """ + Test the schedule query to ensure it is valid. + """ + if self.db: + sync_schedule = async_to_sync(self.db.schedule_query_results) + sync_schedule(self) + + def validate_schedule_fields(self): + """ + Ensure schedule fields are valid. + index: can be a list, it will schedule one search per index. + source: can be a list, it will be the filter for each search. + tokens: can be list, it will ensure the message matches any token. + msg: can be a list, it will ensure the message contains any msg. + No other fields can be lists containing more than one item. + """ + is_schedule = self.is_schedule + + if is_schedule: + allowed_list_fields = ["index", "source", "tokens", "msg"] + for field, value in self.parsed.items(): + if field not in allowed_list_fields: + if len(value) > 1: + raise RuleParseError( + ( + f"For scheduled rules, field {field} cannot contain " + "more than one item" + ), + "data", + ) + if len(str(value[0])) == 0: + raise RuleParseError(f"Field {field} cannot be empty", "data") + if "sentiment" in self.parsed: + sentiment = str(self.parsed["sentiment"][0]) + sentiment = sentiment.strip() + if sentiment[0] not in [">", "<", "="]: + raise RuleParseError( + ( + "Sentiment field must be a comparison operator and then a " + "float: >0.02" + ), + "data", + ) + operator = sentiment[0] + number = sentiment[1:] + + try: + number = float(number) + except ValueError: + raise RuleParseError( + ( + "Sentiment field must be a comparison operator and then a " + "float: >0.02" + ), + "data", + ) + self.aggs["avg_sentiment"] = (operator, number) + + else: + if "query" in self.parsed: + raise RuleParseError( + "Field query cannot be used with on-demand rules", "data" + ) + if "tags" in self.parsed: + raise RuleParseError( + "Field tags cannot be used with on-demand rules", "data" + ) + + @property + def is_schedule(self): + if "interval" in self.cleaned_data: + if self.cleaned_data["interval"] != 0: + return True + return False + + def ensure_list(self): + """ + Ensure all values are lists. + """ + for field, value in self.parsed.items(): + if not isinstance(value, list): + self.parsed[field] = [value] + def validate_user_permissions(self): """ Ensure the user can use notification rules. @@ -161,7 +230,6 @@ class NotificationRuleData(object): "window", ) window_seconds = window_number * SECONDS_PER_UNIT[window_unit] - print("Window seconds", window_seconds) if window_seconds > MAX_WINDOW: raise RuleParseError( f"Window cannot be larger than {MAX_WINDOW} seconds (30 days)", @@ -176,24 +244,24 @@ class NotificationRuleData(object): index = self.parsed["index"] if type(index) == list: for i in index: - db.parse_index(self.user, {"index": i}, raise_error=True) - else: - db.parse_index(self.user, {"index": index}, raise_error=True) + parse_index(self.user, {"index": i}, raise_error=True) + # else: + # db.parse_index(self.user, {"index": index}, raise_error=True) else: # Get the default value for the user if not present - index = db.parse_index(self.user, {}, raise_error=True) + index = parse_index(self.user, {}, raise_error=True) self.parsed["index"] = index if "source" in self.parsed: source = self.parsed["source"] if type(source) == list: for i in source: - db.parse_source(self.user, {"source": i}, raise_error=True) - else: - db.parse_source(self.user, {"source": source}, raise_error=True) + parse_source(self.user, {"source": i}, raise_error=True) + # else: + # parse_source(self.user, {"source": source}, raise_error=True) else: # Get the default value for the user if not present - source = db.parse_source(self.user, {}, raise_error=True) + source = parse_source(self.user, {}, raise_error=True) self.parsed["source"] = source def parse_data(self): diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index 1b586e5..a3e4b10 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -2,12 +2,75 @@ import msgpack from django.core.management.base import BaseCommand from redis import StrictRedis -from core.lib.rules import process_rules +from core.lib.rules import rule_matched +from core.models import NotificationRule from core.util import logs log = logs.get_logger("processing") +def process_rules(data): + all_rules = NotificationRule.objects.filter(enabled=True) + + for index, index_messages in data.items(): + for message in index_messages: + for rule in all_rules: + parsed_rule = rule.parse() + matched = {} + if "index" not in parsed_rule: + continue + if "source" not in parsed_rule: + continue + rule_index = parsed_rule["index"] + rule_source = parsed_rule["source"] + # if not type(rule_index) == list: + # rule_index = [rule_index] + # if not type(rule_source) == list: + # rule_source = [rule_source] + if index not in rule_index: + continue + if message["src"] not in rule_source: + continue + + matched["index"] = index + matched["source"] = message["src"] + + rule_field_length = len(parsed_rule.keys()) + matched_field_number = 0 + for field, value in parsed_rule.items(): + # if not type(value) == list: + # value = [value] + if field == "src": + continue + if field == "tokens": + for token in value: + if "tokens" in message: + if token in message["tokens"]: + matched_field_number += 1 + matched[field] = token + # Break out of the token matching loop + break + # Continue to next field + continue + + # Allow partial matches for msg + if field == "msg": + for msg in value: + if "msg" in message: + if msg.lower() in message["msg"].lower(): + matched_field_number += 1 + matched[field] = msg + # Break out of the msg matching loop + break + # Continue to next field + continue + if field in message and message[field] in value: + matched_field_number += 1 + matched[field] = message[field] + if matched_field_number == rule_field_length - 2: + rule_matched(rule, message, matched) + + class Command(BaseCommand): def handle(self, *args, **options): r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0) diff --git a/core/management/commands/scheduling.py b/core/management/commands/scheduling.py index 255f038..1bd5fe5 100644 --- a/core/management/commands/scheduling.py +++ b/core/management/commands/scheduling.py @@ -1,25 +1,18 @@ import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler +from asgiref.sync import sync_to_async from django.core.management.base import BaseCommand -# from core.db.storage import db -# from core.models import NotificationRule +from core.db.storage import db +from core.lib.parsing import QueryError +from core.lib.rules import NotificationRuleData +from core.models import NotificationRule from core.util import logs log = logs.get_logger("scheduling") -# INTERVAL_CHOICES = ( -# (0, "On demand"), -# (60, "Every minute"), -# (900, "Every 15 minutes"), -# (1800, "Every 30 minutes"), -# (3600, "Every hour"), -# (14400, "Every 4 hours"), -# (86400, "Every day"), -# ) - -INTERVALS = [60, 900, 1800, 3600, 14400, 86400] +INTERVALS = [5, 60, 900, 1800, 3600, 14400, 86400] async def job(interval_seconds): @@ -27,10 +20,17 @@ async def job(interval_seconds): Run all schedules matching the given interval. :param interval_seconds: The interval to run. """ - print("Running schedule", interval_seconds) - # matching_rules = NotificationRule.objects.filter( - # enabled=True, interval=interval_seconds - # ) + matching_rules = await sync_to_async(list)( + NotificationRule.objects.filter(enabled=True, interval=interval_seconds) + ) + for rule in matching_rules: + log.debug(f"Running rule {rule}") + try: + rule = NotificationRuleData(rule.user, rule, db=db) + await rule.run_schedule() + # results = await db.schedule_query_results(rule.user, rule) + except QueryError as e: + log.error(f"Error running rule {rule}: {e}") class Command(BaseCommand): diff --git a/core/migrations/0018_alter_perms_options_notificationrule_match_and_more.py b/core/migrations/0018_alter_perms_options_notificationrule_match_and_more.py new file mode 100644 index 0000000..3dfe235 --- /dev/null +++ b/core/migrations/0018_alter_perms_options_notificationrule_match_and_more.py @@ -0,0 +1,27 @@ +# Generated by Django 4.1.5 on 2023-01-15 00:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0017_alter_notificationrule_interval'), + ] + + operations = [ + migrations.AlterModelOptions( + name='perms', + options={'permissions': (('post_irc', 'Can post to IRC'), ('post_discord', 'Can post to Discord'), ('use_insights', 'Can use the Insights page'), ('use_rules', 'Can use the Rules page'), ('index_internal', 'Can use the internal index'), ('index_meta', 'Can use the meta index'), ('index_restricted', 'Can use the restricted index'), ('restricted_sources', 'Can access restricted sources'))}, + ), + migrations.AddField( + model_name='notificationrule', + name='match', + field=models.BooleanField(default=False), + ), + migrations.AlterField( + model_name='notificationrule', + name='interval', + field=models.IntegerField(choices=[(0, 'On demand'), (5, 'Every 5 seconds'), (60, 'Every minute'), (900, 'Every 15 minutes'), (1800, 'Every 30 minutes'), (3600, 'Every hour'), (14400, 'Every 4 hours'), (86400, 'Every day')], default=0), + ), + ] diff --git a/core/migrations/0019_alter_notificationrule_match.py b/core/migrations/0019_alter_notificationrule_match.py new file mode 100644 index 0000000..3e5f796 --- /dev/null +++ b/core/migrations/0019_alter_notificationrule_match.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.5 on 2023-01-15 01:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0018_alter_perms_options_notificationrule_match_and_more'), + ] + + operations = [ + migrations.AlterField( + model_name='notificationrule', + name='match', + field=models.JSONField(blank=True, null=True), + ), + ] diff --git a/core/models.py b/core/models.py index 43c16ec..5c8bbce 100644 --- a/core/models.py +++ b/core/models.py @@ -26,6 +26,7 @@ PRIORITY_CHOICES = ( INTERVAL_CHOICES = ( (0, "On demand"), + (5, "Every 5 seconds"), (60, "Every minute"), (900, "Every 15 minutes"), (1800, "Every 30 minutes"), @@ -169,6 +170,7 @@ class NotificationRule(models.Model): window = models.CharField(max_length=255, null=True, blank=True) enabled = models.BooleanField(default=True) data = models.TextField() + match = models.JSONField(null=True, blank=True) def __str__(self): return f"{self.user} - {self.name}" diff --git a/core/views/notifications.py b/core/views/notifications.py index 2f95c62..ba923f3 100644 --- a/core/views/notifications.py +++ b/core/views/notifications.py @@ -34,7 +34,7 @@ class NotificationsUpdate(LoginRequiredMixin, PermissionRequiredMixin, ObjectUpd class RuleList(LoginRequiredMixin, ObjectList): list_template = "partials/rule-list.html" model = NotificationRule - page_title = "List of notification rules." + page_title = "List of notification rules" list_url_name = "rules" list_url_args = ["type"] diff --git a/requirements.txt b/requirements.txt index 894e9a2..0ecc574 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ django pre-commit django-crispy-forms crispy-bulma -elasticsearch +elasticsearch[async] stripe django-rest-framework numpy