diff --git a/core/lib/notify.py b/core/lib/notify.py index b9692d4..99188e9 100644 --- a/core/lib/notify.py +++ b/core/lib/notify.py @@ -9,6 +9,17 @@ log = logs.get_logger(__name__) # Actual function to send a message to a topic def ntfy_sendmsg(**kwargs): + """ + Send a message to a topic using NTFY. + kwargs: + msg: Message to send, must be specified + notification_settings: Notification settings, must be specified + url: URL to NTFY server, can be None to use default + topic: Topic to send message to, must be specified + priority: Priority of message, optional + title: Title of message, optional + tags: Tags to add to message, optional + """ msg = kwargs.get("msg", None) notification_settings = kwargs.get("notification_settings") @@ -36,6 +47,12 @@ def ntfy_sendmsg(**kwargs): def webhook_sendmsg(**kwargs): + """ + Send a message to a webhook. + kwargs: + msg: Message to send, must be specified + notification_settings: Notification settings, must be specified + url: URL to webhook, must be specified""" msg = kwargs.get("msg", None) notification_settings = kwargs.get("notification_settings") url = notification_settings.get("url") @@ -50,6 +67,26 @@ def webhook_sendmsg(**kwargs): # Sendmsg helper to send a message to a user's notification settings def sendmsg(**kwargs): + """ + Send a message to a user's notification settings. + Fetches the user's default notification settings if not specified. + kwargs: + user: User to send message to, must be specified + notification_settings: Notification settings, optional + service: Notification service to use + + kwargs for both services: + msg: Message to send, must be specified + notification_settings: Notification settings, must be specified + url: URL to NTFY server, can be None to use default + + extra kwargs for ntfy: + title: Title of message, optional + tags: Tags to add to message, optional + notification_settings: Notification settings, must be specified + topic: Topic to send message to, must be specified + priority: Priority of message, optional + """ user = kwargs.get("user", None) notification_settings = kwargs.get( "notification_settings", user.get_notification_settings().__dict__ diff --git a/core/lib/rules.py b/core/lib/rules.py index 92981c2..cb1b013 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -31,6 +31,14 @@ class RuleParseError(Exception): def format_ntfy(**kwargs): """ Format a message for ntfy. + If the message is a list, it will be joined with newlines. + If the message is None, it will be replaced with an empty string. + If specified, `matched` will be pretty-printed in the first line. + kwargs: + rule: The rule object, must be specified + index: The index the rule matched on, can be None + message: The message to send, can be None + matched: The matched fields, can be None """ rule = kwargs.get("rule") index = kwargs.get("index") @@ -40,9 +48,9 @@ def format_ntfy(**kwargs): # Dump the message in YAML for readability messages_formatted = "" if isinstance(message, list): - for message in message: + for message_iter in message: messages_formatted += dump( - message, Dumper=Dumper, default_flow_style=False + message_iter, Dumper=Dumper, default_flow_style=False ) messages_formatted += "\n" else: @@ -64,6 +72,17 @@ def format_ntfy(**kwargs): def format_webhook(**kwargs): """ Format a message for a webhook. + Adds some metadata to the message that would normally be only in + notification_settings. + Dumps the message in JSON. + kwargs: + rule: The rule object, must be specified + index: The index the rule matched on, can be None + message: The message to send, can be None, but will be sent as None + matched: The matched fields, can be None, but will be sent as None + notification_settings: The notification settings, must be specified + priority: The priority of the message, optional + topic: The topic of the message, optional """ rule = kwargs.get("rule") index = kwargs.get("index") @@ -87,14 +106,30 @@ def format_webhook(**kwargs): def rule_notify(rule, index, message, matched): + """ + Send a notification for a matching rule. + Gets the notification settings for the rule. + Runs the formatting helpers for the service. + :param rule: The rule object, must be specified + :param index: The index the rule matched on, can be None + :param message: The message to send, can be None + :param matched: The matched fields, can be None + """ + # If there is no message, don't say anything matched if message: word = "match" else: word = "no match" + title = f"Rule {rule.name} {word} on {index}" + + # The user notification settings are merged in with this notification_settings = rule.get_notification_settings() if not notification_settings: + # No/invalid notification settings, don't send anything return + + # Create a cast we can reuse for the formatting helpers and sendmsg cast = { "title": title, "user": rule.user, @@ -104,6 +139,7 @@ def rule_notify(rule, index, message, matched): "matched": matched, "notification_settings": notification_settings, } + if rule.service == "ntfy": cast["msg"] = format_ntfy(**cast) @@ -118,7 +154,7 @@ class NotificationRuleData(object): self.user = user self.object = None - # We are running live + # We are running live and have been passed a database object if not isinstance(cleaned_data, dict): self.object = cleaned_data cleaned_data = cleaned_data.__dict__ @@ -141,6 +177,10 @@ class NotificationRuleData(object): self.populate_matched() def populate_matched(self): + """ + On first creation, the match field is None. We need to populate it with + a dictionary containing the index names as keys and False as values. + """ if self.object.match is None: self.object.match = {} for index in self.parsed["index"]: @@ -151,6 +191,9 @@ class NotificationRuleData(object): def store_match(self, index, match): """ Store a match result. + Accepts None for the index to set all indices. + :param index: the index to store the match for, can be None + :param match: True or False, indicating if the rule matched """ if self.object.match is None: self.object.match = {} @@ -168,6 +211,8 @@ class NotificationRuleData(object): def get_match(self, index=None): """ Get a match result for an index. + If the index is None, it will return True if any index has a match. + :param index: the index to get the match for, can be None """ if self.object.match is None: self.object.match = {} @@ -191,6 +236,8 @@ class NotificationRuleData(object): {"avg_sentiment": {"value": 0.6}} It's matched already, we just need to format it like so: {"avg_sentiment": "0.06>0.5"} + :param aggs: the aggregations to format + :return: the formatted aggregations """ new_aggs = {} for agg_name, agg in aggs.items(): @@ -203,6 +250,11 @@ class NotificationRuleData(object): def rule_matched(self, index, message, aggs): """ A rule has matched. + If the previous run did not match, send a notification after formatting + the aggregations. + :param index: the index the rule matched on + :param message: the message object that matched + :param aggs: the aggregations that matched """ current_match = self.get_match(index) log.debug(f"Rule matched: {index} - current match: {current_match}") @@ -215,6 +267,10 @@ class NotificationRuleData(object): def rule_no_match(self, index=None): """ A rule has not matched. + If the previous run did match, send a notification if configured to notify + for empty matches. + :param index: the index the rule did not match on, can be None + """ current_match = self.get_match(index) log.debug(f"Rule not matched: {index} - current match: {current_match}") @@ -227,14 +283,19 @@ class NotificationRuleData(object): async def run_schedule(self): """ Run the schedule query. + Get the results from the database, and check if the rule has matched. + Check if all of the required aggregations have matched. """ response = await self.db.schedule_query_results(self) if not response: + # No results in the result_map self.rule_no_match() for index, (aggs, results) in response.items(): if not results: + # Falsy results, no matches self.rule_not_matched(index) + # Add the match values of all aggregations to a list aggs_for_index = [] for agg_name in self.aggs.keys(): if agg_name in aggs: @@ -244,15 +305,18 @@ class NotificationRuleData(object): # All required aggs are present if len(aggs_for_index) == len(self.aggs.keys()): if all(aggs_for_index): - # Ensure we only send notifications when the previous run - # did not return any matches + # All aggs have matched self.rule_matched(index, results[: self.object.amount], aggs) continue - self.rule_not_matched(index) + # Default branch, since the happy path has a continue keyword + self.rule_no_match(index) def test_schedule(self): """ Test the schedule query to ensure it is valid. + Run the query with the async_to_sync helper so we can call it from + a form. + Raises an exception if the query is invalid. """ if self.db: sync_schedule = async_to_sync(self.db.schedule_query_results) @@ -266,6 +330,7 @@ class NotificationRuleData(object): tokens: can be list, it will ensure the message matches any token. msg: can be a list, it will ensure the message contains any msg. No other fields can be lists containing more than one item. + :raises RuleParseError: if the fields are invalid """ is_schedule = self.is_schedule @@ -321,6 +386,10 @@ class NotificationRuleData(object): @property def is_schedule(self): + """ + Check if the rule is a schedule rule. + :return: True if the rule is a schedule rule, False otherwise + """ if "interval" in self.cleaned_data: if self.cleaned_data["interval"] != 0: return True @@ -328,7 +397,8 @@ class NotificationRuleData(object): def ensure_list(self): """ - Ensure all values are lists. + Ensure all values in the data field are lists. + Convert all strings to lists with one item. """ for field, value in self.parsed.items(): if not isinstance(value, list): @@ -337,6 +407,7 @@ class NotificationRuleData(object): def validate_user_permissions(self): """ Ensure the user can use notification rules. + :raises RuleParseError: if the user does not have permission """ if not self.user.has_perm("core.use_rules"): raise RuleParseError("User does not have permission to use rules", "data") @@ -345,6 +416,12 @@ class NotificationRuleData(object): """ Validate the interval and window fields. Prohibit window being specified with an ondemand interval. + Prohibit window not being specified with a non-ondemand interval. + Prohibit amount being specified with an on-demand interval. + Prohibut amount not being specified with a non-ondemand interval. + Validate window field. + Validate window unit and enforce maximum. + :raises RuleParseError: if the fields are invalid """ interval = self.cleaned_data.get("interval") window = self.cleaned_data.get("window") @@ -403,6 +480,9 @@ class NotificationRuleData(object): def validate_permissions(self): """ Validate permissions for the source and index variables. + Also set the default values for the user if not present. + Stores the default or expanded values in the parsed field. + :raises QueryError: if the user does not have permission to use the source """ if "index" in self.parsed: index = self.parsed["index"] @@ -431,6 +511,7 @@ class NotificationRuleData(object): def parse_data(self): """ Parse the data in the text field to YAML. + :raises RuleParseError: if the data is invalid """ try: self.parsed = load(self.data, Loader=Loader) @@ -438,7 +519,13 @@ class NotificationRuleData(object): raise RuleParseError("data", f"Invalid YAML: {e}") def __str__(self): + """ + Get a YAML representation of the data field of the rule. + """ return dump(self.parsed, Dumper=Dumper) def get_data(self): + """ + Return the data field as a dictionary. + """ return self.parsed diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index 736dd02..e64f558 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -19,6 +19,7 @@ def process_rules(data): # up a NotificationRuleData object parsed_rule = rule.parse() matched = {} + # Rule is invalid, this shouldn't happen if "index" not in parsed_rule: continue if "source" not in parsed_rule: @@ -30,8 +31,10 @@ def process_rules(data): # if not type(rule_source) == list: # rule_source = [rule_source] if index not in rule_index: + # We don't care about this index, go to the next one continue if message["src"] not in rule_source: + # We don't care about this source, go to the next one continue matched["index"] = index @@ -43,8 +46,11 @@ def process_rules(data): # if not type(value) == list: # value = [value] if field == "src": + # We already checked this continue if field == "tokens": + # Check if tokens are in the rule + # We only check if *at least one* token matches for token in value: if "tokens" in message: if token in message["tokens"]: @@ -55,8 +61,8 @@ def process_rules(data): # Continue to next field continue - # Allow partial matches for msg if field == "msg": + # Allow partial matches for msg for msg in value: if "msg" in message: if msg.lower() in message["msg"].lower(): @@ -67,8 +73,10 @@ def process_rules(data): # Continue to next field continue if field in message and message[field] in value: + # Do exact matches for all other fields matched_field_number += 1 matched[field] = message[field] + # Subtract 2, 1 for source and 1 for index if matched_field_number == rule_field_length - 2: rule_notify(rule, index, message, matched) diff --git a/core/management/commands/scheduling.py b/core/management/commands/scheduling.py index 1bd5fe5..28b4325 100644 --- a/core/management/commands/scheduling.py +++ b/core/management/commands/scheduling.py @@ -6,7 +6,7 @@ from django.core.management.base import BaseCommand from core.db.storage import db from core.lib.parsing import QueryError -from core.lib.rules import NotificationRuleData +from core.lib.rules import NotificationRuleData, RuleParseError from core.models import NotificationRule from core.util import logs @@ -31,6 +31,8 @@ async def job(interval_seconds): # results = await db.schedule_query_results(rule.user, rule) except QueryError as e: log.error(f"Error running rule {rule}: {e}") + except RuleParseError as e: + log.error(f"Error parsing rule {rule}: {e}") class Command(BaseCommand):