diff --git a/core/db/elastic.py b/core/db/elastic.py index c0326e2..28889a7 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -294,11 +294,10 @@ class ElasticsearchBackend(StorageBackend): self.log.error(f"Indexing failed: {result}") self.log.debug(f"Indexed {len(matches)} messages in ES") - async def schedule_query_results(self, rule_object): + def prepare_schedule_query(self, rule_object): """ - Helper to run a scheduled query with reduced functionality and async. + Helper to run a scheduled query with reduced functionality. """ - data = rule_object.parsed if "tags" in data: @@ -310,8 +309,6 @@ class ElasticsearchBackend(StorageBackend): query = data["query"][0] data["query"] = query - result_map = {} - add_bool = [] add_top = [] if "source" in data: @@ -352,34 +349,13 @@ class ElasticsearchBackend(StorageBackend): "avg": {"field": "sentiment"}, } } - for index in data["index"]: - if "message" in search_query: - self.log.error(f"Error parsing query: {search_query['message']}") - continue - response = await self.async_run_query( - rule_object.user, - search_query, - index=index, - ) - self.log.debug(f"Running scheduled query on {index}: {search_query}") - # self.log.debug(f"Response from scheduled query: {response}") - if isinstance(response, Exception): - error = response.info["error"]["root_cause"][0]["reason"] - self.log.error(f"Error running scheduled search: {error}") - raise QueryError(error) - if len(response["hits"]["hits"]) == 0: - # No results, skip - continue - meta, response = self.parse(response, meta=True) - # print("Parsed response", response) - if "message" in response: - self.log.error(f"Error running scheduled search: {response['message']}") - continue - result_map[index] = (meta, response) + return search_query - # Average aggregation check - # Could probably do this in elasticsearch + def schedule_check_aggregations(self, rule_object, result_map): + """ + Check the results of a scheduled query for aggregations. + """ for index, (meta, result) in result_map.items(): # Default to true, if no aggs are found, we still want to match match = True @@ -412,6 +388,71 @@ class ElasticsearchBackend(StorageBackend): return result_map + def schedule_query_results_test_sync(self, rule_object): + """ + Helper to run a scheduled query test with reduced functionality. + Sync version for running from Django forms. + Does not return results. + """ + data = rule_object.parsed + + search_query = self.prepare_schedule_query(rule_object) + for index in data["index"]: + if "message" in search_query: + self.log.error(f"Error parsing test query: {search_query['message']}") + continue + response = self.run_query( + rule_object.user, + search_query, + index=index, + ) + self.log.debug(f"Running scheduled test query on {index}: {search_query}") + # self.log.debug(f"Response from scheduled query: {response}") + if isinstance(response, Exception): + error = response.info["error"]["root_cause"][0]["reason"] + self.log.error(f"Error running test scheduled search: {error}") + raise QueryError(error) + + async def schedule_query_results(self, rule_object): + """ + Helper to run a scheduled query with reduced functionality and async. + """ + result_map = {} + data = rule_object.parsed + + search_query = self.prepare_schedule_query(rule_object) + + for index in data["index"]: + if "message" in search_query: + self.log.error(f"Error parsing query: {search_query['message']}") + continue + response = await self.async_run_query( + rule_object.user, + search_query, + index=index, + ) + self.log.debug(f"Running scheduled query on {index}: {search_query}") + # self.log.debug(f"Response from scheduled query: {response}") + if isinstance(response, Exception): + error = response.info["error"]["root_cause"][0]["reason"] + self.log.error(f"Error running scheduled search: {error}") + raise QueryError(error) + if len(response["hits"]["hits"]) == 0: + # No results, skip + continue + meta, response = self.parse(response, meta=True) + # print("Parsed response", response) + if "message" in response: + self.log.error(f"Error running scheduled search: {response['message']}") + continue + result_map[index] = (meta, response) + + # Average aggregation check + # Could probably do this in elasticsearch + result_map = self.schedule_check_aggregations(rule_object, result_map) + + return result_map + def query_results( self, request, @@ -424,7 +465,6 @@ class ElasticsearchBackend(StorageBackend): dedup_fields=None, tags=None, ): - add_bool = [] add_top = [] add_top_negative = [] diff --git a/core/forms.py b/core/forms.py index b4358e7..135e79e 100644 --- a/core/forms.py +++ b/core/forms.py @@ -116,7 +116,7 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): "topic", "url", "service", - "send_empty", + "policy", "enabled", ) help_texts = { @@ -130,7 +130,7 @@ class NotificationRuleForm(RestrictedFormMixin, ModelForm): "interval": "How often to run the search. On demand evaluates messages as they are received, without running a scheduled search. The remaining options schedule a search of the database with the window below.", "window": "Time window to search: 1d, 1h, 1m, 1s, etc.", "amount": "Amount of matches to be returned for scheduled queries. Cannot be used with on-demand queries.", - "send_empty": "Send a notification if no matches are found.", + "policy": "When to trigger this policy.", } def clean(self): diff --git a/core/lib/rules.py b/core/lib/rules.py index 409295d..b44c937 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -11,7 +11,6 @@ except ImportError: from datetime import datetime import orjson -from asgiref.sync import async_to_sync from siphashc import siphash from core.lib.notify import sendmsg @@ -182,6 +181,7 @@ class NotificationRuleData(object): self.db = db self.data = self.cleaned_data.get("data") self.window = self.cleaned_data.get("window") + self.policy = self.cleaned_data.get("policy") self.parsed = None self.aggs = {} @@ -323,6 +323,9 @@ class NotificationRuleData(object): """ current_match = self.get_match(index, message) log.debug(f"Rule matched: {index} - current match: {current_match}") + + # Default policy: Trigger only when results change + if current_match is False: # Matched now, but not before if "matched" not in meta: @@ -342,6 +345,8 @@ class NotificationRuleData(object): """ current_match = self.get_match(index, message) log.debug(f"Rule matched: {index} - current match: {current_match}") + + # Default policy: Trigger only when results change if current_match is False: # Matched now, but not before if "matched" not in meta: @@ -361,6 +366,8 @@ class NotificationRuleData(object): """ current_match = self.get_match(index) log.debug(f"Rule not matched: {index} - current match: {current_match}") + + # Change policy: When there are no results following a successful run if current_match is True: # Matched before, but not now if self.object.send_empty: @@ -407,13 +414,10 @@ class NotificationRuleData(object): def test_schedule(self): """ Test the schedule query to ensure it is valid. - Run the query with the async_to_sync helper so we can call it from - a form. Raises an exception if the query is invalid. """ if self.db: - sync_schedule = async_to_sync(self.db.schedule_query_results) - sync_schedule(self) + self.db.schedule_query_results_test_sync(self) def validate_schedule_fields(self): """ @@ -476,9 +480,10 @@ class NotificationRuleData(object): raise RuleParseError( "Field tags cannot be used with on-demand rules", "data" ) - if self.cleaned_data["send_empty"]: + if self.policy != "default": raise RuleParseError( - "Field cannot be used with on-demand rules", "send_empty" + f"Cannot use {self.cleaned_data['policy']} policy with on-demand rules", + "policy", ) @property diff --git a/core/migrations/0026_notificationrule_policy_and_more.py b/core/migrations/0026_notificationrule_policy_and_more.py new file mode 100644 index 0000000..b7ac9ed --- /dev/null +++ b/core/migrations/0026_notificationrule_policy_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 4.1.5 on 2023-02-09 14:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0025_alter_notificationrule_id'), + ] + + operations = [ + migrations.AddField( + model_name='notificationrule', + name='policy', + field=models.CharField(choices=[('default', 'Only trigger for matched events'), ('change', 'Trigger only if no results found when they were last run'), ('always', 'Always trigger regardless of whether results are found')], default='default', max_length=255), + ), + migrations.AlterField( + model_name='notificationrule', + name='service', + field=models.CharField(choices=[('ntfy', 'NTFY'), ('webhook', 'Custom webhook'), ('none', 'Disabled')], default='ntfy', max_length=255), + ), + migrations.AlterField( + model_name='notificationsettings', + name='service', + field=models.CharField(choices=[('ntfy', 'NTFY'), ('webhook', 'Custom webhook'), ('none', 'Disabled')], default='ntfy', max_length=255), + ), + ] diff --git a/core/models.py b/core/models.py index 8be3a41..c32e313 100644 --- a/core/models.py +++ b/core/models.py @@ -42,6 +42,15 @@ SERVICE_CHOICES = ( ("none", "Disabled"), ) +POLICY_CHOICES = ( + ("default", "Default: Trigger only when there were no results last time"), + ( + "change", + "Change: Default + trigger when there are no results (if there were before)", + ), + ("always", "Always: Trigger on every run (not recommended for low intervals)"), +) + class Plan(models.Model): name = models.CharField(max_length=255, unique=True) @@ -193,6 +202,7 @@ class NotificationRule(models.Model): match = models.JSONField(null=True, blank=True) service = models.CharField(choices=SERVICE_CHOICES, max_length=255, default="ntfy") send_empty = models.BooleanField(default=False) + policy = models.CharField(choices=POLICY_CHOICES, max_length=255, default="default") def __str__(self): return f"{self.user} - {self.name}" diff --git a/core/util/logs.py b/core/util/logs.py index 045c95f..26cc230 100644 --- a/core/util/logs.py +++ b/core/util/logs.py @@ -43,7 +43,6 @@ class ColoredFormatter(logging.Formatter): def get_logger(name): - # Define the logging format FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s" COLOR_FORMAT = formatter_message(FORMAT, True) diff --git a/core/views/manage/threshold/irc.py b/core/views/manage/threshold/irc.py index 3b1f59d..b83c5d2 100644 --- a/core/views/manage/threshold/irc.py +++ b/core/views/manage/threshold/irc.py @@ -121,7 +121,6 @@ class ThresholdIRCNetworkRelayDel(SuperUserRequiredMixin, APIView): """ deleted = threshold.del_relay(net, num) if deleted["success"]: - message = f"Deleted relay {num}" message_class = "success" else: @@ -150,7 +149,6 @@ class ThresholdIRCNetworkRelayProvision(SuperUserRequiredMixin, APIView): """ provisioned = threshold.irc_provision_relay(net, num) if provisioned["success"]: - message = f"Provisioned relay {num}" message_class = "success" else: @@ -179,7 +177,6 @@ class ThresholdIRCNetworkRelayAuth(SuperUserRequiredMixin, APIView): """ provisioned = threshold.irc_enable_auth(net, num) if provisioned["success"]: - message = f"Enabled authentication on relay {num}" message_class = "success" else: