From 75603570fff3dfa3afd64a75e09c72fca5326ad0 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sun, 15 Jan 2023 23:02:13 +0000 Subject: [PATCH] Finish implementing webhook delivery --- core/db/__init__.py | 8 ++- core/db/elastic.py | 9 ++- core/forms.py | 2 + core/lib/notify.py | 38 ++++++---- core/lib/rules.py | 72 +++++++++++++++---- .../0021_notificationrule_amount_and_more.py | 28 ++++++++ core/models.py | 18 +++++ 7 files changed, 144 insertions(+), 31 deletions(-) create mode 100644 core/migrations/0021_notificationrule_amount_and_more.py diff --git a/core/db/__init__.py b/core/db/__init__.py index fe1fa9c..1eb2996 100644 --- a/core/db/__init__.py +++ b/core/db/__init__.py @@ -98,9 +98,11 @@ class StorageBackend(ABC): for tagname, tagvalue in item.items(): add_bool.append({tagname: tagvalue}) - valid = self.check_valid_query(query_params, custom_query) - if isinstance(valid, dict): - return valid + bypass_check = kwargs.get("bypass_check", False) + if not bypass_check: + valid = self.check_valid_query(query_params, custom_query, **kwargs) + if isinstance(valid, dict): + return valid return search_query diff --git a/core/db/elastic.py b/core/db/elastic.py index 7e5be08..21f10db 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -135,7 +135,7 @@ class ElasticsearchBackend(StorageBackend): ) return query - def construct_query(self, query, size=None, blank=False): + def construct_query(self, query, size=None, blank=False, **kwargs): """ Accept some query parameters and construct an Elasticsearch query. """ @@ -252,7 +252,10 @@ class ElasticsearchBackend(StorageBackend): if field not in ["source", "index", "tags", "query", "sentiment"]: for value in values: add_top.append({"match": {field: value}}) - search_query = self.parse_query(data, tags, None, False, add_bool) + # Bypass the check for query and tags membership since we can search by msg, etc + search_query = self.parse_query( + data, tags, None, False, add_bool, bypass_check=True + ) self.add_bool(search_query, add_bool) self.add_top(search_query, add_top) if "sentiment" in data: @@ -503,4 +506,6 @@ class ElasticsearchBackend(StorageBackend): search_query["query"]["bool"]["must_not"] = [item] else: for item in add_top: + if "query" not in search_query: + search_query["query"] = {"bool": {"must": []}} search_query["query"]["bool"]["must"].append(item) diff --git a/core/forms.py b/core/forms.py index 98a1a5d..f9670f2 100644 --- a/core/forms.py +++ b/core/forms.py @@ -103,6 +103,7 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): "data", "interval", "window", + "amount", "priority", "topic", "url", @@ -119,6 +120,7 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): "data": "The notification rule definition.", "interval": "How often to run the search. On demand evaluates messages as they are received, without running a scheduled search. The remaining options schedule a search of the database with the window below.", "window": "Time window to search: 1d, 1h, 1m, 1s, etc.", + "amount": "Amount of matches to be returned for scheduled queries. Cannot be used with on-demand queries.", } def clean(self): diff --git a/core/lib/notify.py b/core/lib/notify.py index d63a001..00613d4 100644 --- a/core/lib/notify.py +++ b/core/lib/notify.py @@ -8,7 +8,13 @@ log = logs.get_logger(__name__) # Actual function to send a message to a topic -def raw_sendmsg(msg, title=None, priority=None, tags=None, url=None, topic=None): +def ntfy_sendmsg(msg, **kwargs): + title = kwargs.get("title", None) + priority = kwargs.get("priority", None) + tags = kwargs.get("tags", None) + url = kwargs.get("url", NTFY_URL) + topic = kwargs.get("topic", None) + headers = {"Title": "Fisk"} if title: headers["Title"] = title @@ -22,15 +28,23 @@ def raw_sendmsg(msg, title=None, priority=None, tags=None, url=None, topic=None) data=msg, headers=headers, ) - print("Sent notification to", url) - print("topic", topic) - print("msg", msg) except requests.exceptions.ConnectionError as e: log.error(f"Error sending notification: {e}") +def webhook_sendmsg(msg, url): + try: + requests.post( + f"{url}", + data=msg, + ) + except requests.exceptions.ConnectionError as e: + log.error(f"Error sending webhook: {e}") + + # Sendmsg helper to send a message to a user's notification settings -def sendmsg(user, *args, **kwargs): +def sendmsg(user, msg, **kwargs): + service = kwargs.get("service", "ntfy") notification_settings = user.get_notification_settings() # No custom topic specified @@ -42,11 +56,11 @@ def sendmsg(user, *args, **kwargs): else: kwargs["topic"] = notification_settings.topic - if "url" in kwargs: - url = kwargs["url"] - elif notification_settings.url is not None: - url = notification_settings.url - else: - url = NTFY_URL + if "url" not in kwargs: + if notification_settings.url is not None: + kwargs["url"] = notification_settings.url - raw_sendmsg(*args, **kwargs, url=url) + if service == "ntfy": + ntfy_sendmsg(msg, **kwargs) + elif service == "webhook": + webhook_sendmsg(msg, kwargs["url"]) diff --git a/core/lib/rules.py b/core/lib/rules.py index a599640..e7e2f62 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -8,6 +8,7 @@ try: except ImportError: from yaml import Loader, Dumper +import orjson from asgiref.sync import async_to_sync from core.lib.notify import sendmsg @@ -29,20 +30,41 @@ class RuleParseError(Exception): def rule_matched(rule, message, matched): title = f"Rule {rule.name} matched" - - # Dump the message in YAML for readability - message = dump(message, Dumper=Dumper, default_flow_style=False) - matched = ", ".join([f"{k}: {v}" for k, v in matched.items()]) - - notify_message = f"{rule.name} match: {matched}\n{message}" - notify_message = notify_message.encode("utf-8", "replace") - + notification_settings = rule.get_notification_settings() cast = { "title": title, - "priority": str(rule.priority), + **notification_settings, } - if rule.topic is not None: - cast["topic"] = rule.topic + + if rule.service == "ntfy": + # Dump the message in YAML for readability + messages_formatted = "" + if isinstance(message, list): + for message in message: + messages_formatted += dump( + message, Dumper=Dumper, default_flow_style=False + ) + messages_formatted += "\n" + else: + messages_formatted = dump(message, Dumper=Dumper, default_flow_style=False) + matched = ", ".join([f"{k}: {v}" for k, v in matched.items()]) + + notify_message = f"{rule.name} match: {matched}\n{messages_formatted}" + notify_message = notify_message.encode("utf-8", "replace") + + elif rule.service == "webhook": + notify_message = { + "rule_id": rule.id, + "rule_name": rule.name, + "match": matched, + "data": message, + } + if "priority" in notification_settings: + notify_message["priority"] = notification_settings["priority"] + if "topic" in notification_settings: + notify_message["topic"] = notification_settings["topic"] + notify_message = orjson.dumps(notify_message) + sendmsg(rule.user, notify_message, **cast) @@ -135,7 +157,9 @@ class NotificationRuleData(object): current_match = self.get_match(index) if current_match is False: formatted_aggs = self.format_aggs(aggs) - rule_matched(self.object, results[:5], formatted_aggs) + rule_matched( + self.object, results[: self.object.amount], formatted_aggs + ) self.store_match(index, True) continue self.store_match(index, False) @@ -238,16 +262,36 @@ class NotificationRuleData(object): """ interval = self.cleaned_data.get("interval") window = self.cleaned_data.get("window") - if interval == 0 and window is not None: + amount = self.cleaned_data.get("amount") + + on_demand = interval == 0 + if on_demand and window is not None: + # Interval is on demand and window is specified + # We can't have a window with on-demand rules raise RuleParseError( "Window cannot be specified with on-demand interval", "window" ) - if interval != 0 and window is None: + if not on_demand and window is None: + # Interval is not on demand and window is not specified + # We can't have a non-on-demand interval without a window raise RuleParseError( "Window must be specified with non-on-demand interval", "window" ) + if not on_demand and amount is None: + # Interval is not on demand and amount is not specified + # We can't have a non-on-demand interval without an amount + raise RuleParseError( + "Amount must be specified with non-on-demand interval", "amount" + ) + if on_demand and amount is not None: + # Interval is on demand and amount is specified + # We can't have an amount with on-demand rules + raise RuleParseError( + "Amount cannot be specified with on-demand interval", "amount" + ) + if window is not None: window_number = window[:-1] if not window_number.isdigit(): diff --git a/core/migrations/0021_notificationrule_amount_and_more.py b/core/migrations/0021_notificationrule_amount_and_more.py new file mode 100644 index 0000000..e7d0e5a --- /dev/null +++ b/core/migrations/0021_notificationrule_amount_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 4.1.5 on 2023-01-15 20:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0020_rename_ntfy_topic_notificationsettings_topic_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='notificationrule', + name='amount', + field=models.IntegerField(blank=True, default=1, null=True), + ), + migrations.AlterField( + model_name='notificationrule', + name='service', + field=models.CharField(choices=[('ntfy', 'NTFY'), ('webhook', 'Custom webhook')], default='ntfy', max_length=255), + ), + migrations.AlterField( + model_name='notificationsettings', + name='service', + field=models.CharField(choices=[('ntfy', 'NTFY'), ('webhook', 'Custom webhook')], default='ntfy', max_length=255), + ), + ] diff --git a/core/models.py b/core/models.py index 986c686..715e4ca 100644 --- a/core/models.py +++ b/core/models.py @@ -174,6 +174,7 @@ class NotificationRule(models.Model): url = models.CharField(max_length=1024, null=True, blank=True) interval = models.IntegerField(choices=INTERVAL_CHOICES, default=0) window = models.CharField(max_length=255, null=True, blank=True) + amount = models.PositiveIntegerField(default=1, null=True, blank=True) enabled = models.BooleanField(default=True) data = models.TextField() match = models.JSONField(null=True, blank=True) @@ -197,6 +198,23 @@ class NotificationRule(models.Model): if isinstance(self.match, dict): return f"{sum(list(self.match.values()))}/{len(self.match)}" + def get_notification_settings(self): + """ + Get the notification settings for this rule. + Notification rule settings take priority. + """ + user_settings = self.user.get_notification_settings() + user_settings = user_settings.__dict__ + if self.priority is not None: + user_settings["priority"] = str(self.priority) + if self.topic is not None: + user_settings["topic"] = self.topic + if self.url is not None: + user_settings["url"] = self.url + if self.service is not None: + user_settings["service"] = self.service + return user_settings + class NotificationSettings(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE)