From a2207bbcf40985049ba2dda1c24cf2b6e60c7e94 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Mon, 16 Jan 2023 00:10:41 +0000 Subject: [PATCH] Support sending messages when a rule no longer matches and fix dual-use notification sender --- core/forms.py | 2 + core/lib/notify.py | 43 +++--- core/lib/rules.py | 141 +++++++++++++----- core/management/commands/processing.py | 4 +- ...22_notificationrule_send_empty_and_more.py | 23 +++ core/models.py | 23 ++- 6 files changed, 173 insertions(+), 63 deletions(-) create mode 100644 core/migrations/0022_notificationrule_send_empty_and_more.py diff --git a/core/forms.py b/core/forms.py index f9670f2..e4b53c5 100644 --- a/core/forms.py +++ b/core/forms.py @@ -108,6 +108,7 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): "topic", "url", "service", + "send_empty", "enabled", ) help_texts = { @@ -121,6 +122,7 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): "interval": "How often to run the search. On demand evaluates messages as they are received, without running a scheduled search. The remaining options schedule a search of the database with the window below.", "window": "Time window to search: 1d, 1h, 1m, 1s, etc.", "amount": "Amount of matches to be returned for scheduled queries. Cannot be used with on-demand queries.", + "send_empty": "Send a notification if no matches are found.", } def clean(self): diff --git a/core/lib/notify.py b/core/lib/notify.py index 00613d4..b9692d4 100644 --- a/core/lib/notify.py +++ b/core/lib/notify.py @@ -8,12 +8,15 @@ log = logs.get_logger(__name__) # Actual function to send a message to a topic -def ntfy_sendmsg(msg, **kwargs): +def ntfy_sendmsg(**kwargs): + msg = kwargs.get("msg", None) + notification_settings = kwargs.get("notification_settings") + title = kwargs.get("title", None) - priority = kwargs.get("priority", None) + priority = notification_settings.get("priority", None) tags = kwargs.get("tags", None) - url = kwargs.get("url", NTFY_URL) - topic = kwargs.get("topic", None) + url = notification_settings.get("url") or NTFY_URL + topic = notification_settings.get("topic", None) headers = {"Title": "Fisk"} if title: @@ -32,7 +35,10 @@ def ntfy_sendmsg(msg, **kwargs): log.error(f"Error sending notification: {e}") -def webhook_sendmsg(msg, url): +def webhook_sendmsg(**kwargs): + msg = kwargs.get("msg", None) + notification_settings = kwargs.get("notification_settings") + url = notification_settings.get("url") try: requests.post( f"{url}", @@ -43,24 +49,17 @@ def webhook_sendmsg(msg, url): # Sendmsg helper to send a message to a user's notification settings -def sendmsg(user, msg, **kwargs): - service = kwargs.get("service", "ntfy") - notification_settings = user.get_notification_settings() +def sendmsg(**kwargs): + user = kwargs.get("user", None) + notification_settings = kwargs.get( + "notification_settings", user.get_notification_settings().__dict__ + ) + if not notification_settings: + return - # No custom topic specified - if "topic" not in kwargs: - # No user topic set either - if notification_settings.topic is None: - # No topic set, so don't send - return - else: - kwargs["topic"] = notification_settings.topic - - if "url" not in kwargs: - if notification_settings.url is not None: - kwargs["url"] = notification_settings.url + service = notification_settings.get("service") if service == "ntfy": - ntfy_sendmsg(msg, **kwargs) + ntfy_sendmsg(**kwargs) elif service == "webhook": - webhook_sendmsg(msg, kwargs["url"]) + webhook_sendmsg(**kwargs) diff --git a/core/lib/rules.py b/core/lib/rules.py index e7e2f62..f59cd1b 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -28,15 +28,15 @@ class RuleParseError(Exception): self.field = field -def rule_matched(rule, message, matched): - title = f"Rule {rule.name} matched" - notification_settings = rule.get_notification_settings() - cast = { - "title": title, - **notification_settings, - } - - if rule.service == "ntfy": +def format_ntfy(**kwargs): + """ + Format a message for ntfy. + """ + rule = kwargs.get("rule") + index = kwargs.get("index") + message = kwargs.get("message") + matched = kwargs.get("matched") + if message: # Dump the message in YAML for readability messages_formatted = "" if isinstance(message, list): @@ -47,25 +47,70 @@ def rule_matched(rule, message, matched): messages_formatted += "\n" else: messages_formatted = dump(message, Dumper=Dumper, default_flow_style=False) - matched = ", ".join([f"{k}: {v}" for k, v in matched.items()]) + else: + messages_formatted = "" - notify_message = f"{rule.name} match: {matched}\n{messages_formatted}" - notify_message = notify_message.encode("utf-8", "replace") + if matched: + matched = ", ".join([f"{k}: {v}" for k, v in matched.items()]) + else: + matched = "" + + notify_message = f"{rule.name} on {index}: {matched}\n{messages_formatted}" + notify_message = notify_message.encode("utf-8", "replace") + + return notify_message + + +def format_webhook(**kwargs): + """ + Format a message for a webhook. + """ + rule = kwargs.get("rule") + index = kwargs.get("index") + message = kwargs.get("message") + matched = kwargs.get("matched") + notification_settings = kwargs.get("notification_settings") + notify_message = { + "rule_id": rule.id, + "rule_name": rule.name, + "match": matched, + "index": index, + "data": message, + } + if "priority" in notification_settings: + notify_message["priority"] = notification_settings["priority"] + if "topic" in notification_settings: + notify_message["topic"] = notification_settings["topic"] + notify_message = orjson.dumps(notify_message) + + return notify_message + + +def rule_notify(rule, index, message, matched): + if message: + word = "match" + else: + word = "no match" + title = f"Rule {rule.name} {word} on {index}" + notification_settings = rule.get_notification_settings() + if not notification_settings: + return + cast = { + "title": title, + "user": rule.user, + "rule": rule, + "index": index, + "message": message, + "matched": matched, + "notification_settings": notification_settings, + } + if rule.service == "ntfy": + cast["msg"] = format_ntfy(**cast) elif rule.service == "webhook": - notify_message = { - "rule_id": rule.id, - "rule_name": rule.name, - "match": matched, - "data": message, - } - if "priority" in notification_settings: - notify_message["priority"] = notification_settings["priority"] - if "topic" in notification_settings: - notify_message["topic"] = notification_settings["topic"] - notify_message = orjson.dumps(notify_message) + cast["msg"] = format_webhook(**cast) - sendmsg(rule.user, notify_message, **cast) + sendmsg(**cast) class NotificationRuleData(object): @@ -101,11 +146,15 @@ class NotificationRuleData(object): if not isinstance(self.object.match, dict): self.object.match = {} - self.object.match[index] = match + if index is None: + for index_iter in self.parsed["index"]: + self.object.match[index_iter] = match + else: + self.object.match[index] = match self.object.save() log.debug(f"Stored match: {index} - {match}") - def get_match(self, index): + def get_match(self, index=None): """ Get a match result for an index. """ @@ -114,6 +163,10 @@ class NotificationRuleData(object): if not isinstance(self.object.match, dict): return None + if index is None: + # Check if we have any matches on all indices + return any(self.object.match.values()) + return self.object.match.get(index) def format_aggs(self, aggs): @@ -134,14 +187,38 @@ class NotificationRuleData(object): return new_aggs + def rule_matched(self, index, message, aggs): + """ + A rule has matched. + """ + current_match = self.get_match(index) + if current_match is False: + # Matched now, but not before + formatted_aggs = self.format_aggs(aggs) + rule_notify(self.object, index, message, formatted_aggs) + self.store_match(index, True) + + def rule_no_match(self, index=None): + """ + A rule has not matched. + """ + current_match = self.get_match(index) + if current_match is True: + # Matched before, but not now + if self.object.send_empty: + rule_notify(self.object, index, "no_match", None) + self.store_match(index, False) + async def run_schedule(self): """ Run the schedule query. """ response = await self.db.schedule_query_results(self) + if not response: + self.rule_no_match() for index, (aggs, results) in response.items(): if not results: - self.store_match(index, False) + self.rule_not_matched(index) aggs_for_index = [] for agg_name in self.aggs.keys(): @@ -154,15 +231,9 @@ class NotificationRuleData(object): if all(aggs_for_index): # Ensure we only send notifications when the previous run # did not return any matches - current_match = self.get_match(index) - if current_match is False: - formatted_aggs = self.format_aggs(aggs) - rule_matched( - self.object, results[: self.object.amount], formatted_aggs - ) - self.store_match(index, True) + self.rule_matched(index, results[: self.object.amount], aggs) continue - self.store_match(index, False) + self.rule_not_matched(index) def test_schedule(self): """ diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index 1cd6d4c..dd7f4fc 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -2,7 +2,7 @@ import msgpack from django.core.management.base import BaseCommand from redis import StrictRedis -from core.lib.rules import rule_matched +from core.lib.rules import rule_notify from core.models import NotificationRule from core.util import logs @@ -70,7 +70,7 @@ def process_rules(data): matched_field_number += 1 matched[field] = message[field] if matched_field_number == rule_field_length - 2: - rule_matched(rule, message, matched) + rule_notify(rule, index, message, matched) class Command(BaseCommand): diff --git a/core/migrations/0022_notificationrule_send_empty_and_more.py b/core/migrations/0022_notificationrule_send_empty_and_more.py new file mode 100644 index 0000000..68eb0ee --- /dev/null +++ b/core/migrations/0022_notificationrule_send_empty_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 4.1.5 on 2023-01-15 23:34 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0021_notificationrule_amount_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='notificationrule', + name='send_empty', + field=models.BooleanField(default=False), + ), + migrations.AlterField( + model_name='notificationrule', + name='amount', + field=models.PositiveIntegerField(blank=True, default=1, null=True), + ), + ] diff --git a/core/models.py b/core/models.py index 715e4ca..4bca3f0 100644 --- a/core/models.py +++ b/core/models.py @@ -92,8 +92,14 @@ class User(AbstractUser): plan_list = [plan.name for plan in self.plans.all()] return plan in plan_list - def get_notification_settings(self): - return NotificationSettings.objects.get_or_create(user=self)[0] + def get_notification_settings(self, check=True): + sets = NotificationSettings.objects.get_or_create(user=self)[0] + if check: + if sets.service == "ntfy" and sets.topic is None: + return None + if sets.service == "webhook" and sets.url is None: + return None + return sets @property def allowed_indices(self): @@ -179,6 +185,7 @@ class NotificationRule(models.Model): data = models.TextField() match = models.JSONField(null=True, blank=True) service = models.CharField(choices=SERVICE_CHOICES, max_length=255, default="ntfy") + send_empty = models.BooleanField(default=False) def __str__(self): return f"{self.user} - {self.name}" @@ -198,12 +205,12 @@ class NotificationRule(models.Model): if isinstance(self.match, dict): return f"{sum(list(self.match.values()))}/{len(self.match)}" - def get_notification_settings(self): + def get_notification_settings(self, check=True): """ Get the notification settings for this rule. Notification rule settings take priority. """ - user_settings = self.user.get_notification_settings() + user_settings = self.user.get_notification_settings(check=False) user_settings = user_settings.__dict__ if self.priority is not None: user_settings["priority"] = str(self.priority) @@ -213,6 +220,14 @@ class NotificationRule(models.Model): user_settings["url"] = self.url if self.service is not None: user_settings["service"] = self.service + if self.send_empty is not None: + user_settings["send_empty"] = self.send_empty + + if check: + if user_settings["service"] == "ntfy" and user_settings["topic"] is None: + return None + if user_settings["service"] == "webhook" and user_settings["url"] is None: + return None return user_settings