From 87c232d3f9f16f3ea746d22a2a46a070991446b3 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 10 Feb 2023 22:52:59 +0000 Subject: [PATCH] Fix notification delivery --- core/db/elastic.py | 4 +-- core/lib/notify.py | 2 ++ core/lib/rules.py | 86 ++++++++++++++++++++++++++++------------------ 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/core/db/elastic.py b/core/db/elastic.py index 700d727..e736218 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -350,8 +350,8 @@ class ElasticsearchBackend(StorageBackend): range_query = { "range": { "ts": { - "gte": f"now-{rule_object.window}/d", - "lte": "now/d", + "gte": f"now-{rule_object.window}", + "lte": "now", } } } diff --git a/core/lib/notify.py b/core/lib/notify.py index 45b366a..6982309 100644 --- a/core/lib/notify.py +++ b/core/lib/notify.py @@ -56,9 +56,11 @@ def webhook_sendmsg(**kwargs): msg = kwargs.get("msg", None) notification_settings = kwargs.get("notification_settings") url = notification_settings.get("url") + headers = {"Content-type": "application/json"} try: requests.post( f"{url}", + headers=headers, data=msg, ) except requests.exceptions.ConnectionError as e: diff --git a/core/lib/rules.py b/core/lib/rules.py index a4ea864..e5b59cf 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -9,6 +9,7 @@ except ImportError: from yaml import Loader, Dumper import uuid +from copy import deepcopy from datetime import datetime import orjson @@ -44,14 +45,18 @@ def format_ntfy(**kwargs): rule: The rule object, must be specified index: The index the rule matched on, can be None message: The message to send, can be None - matched: The matched fields, can be None - total_hits: The total number of matches, optional + meta: + matched: The matched fields, can be None + total_hits: The total number of matches, optional """ rule = kwargs.get("rule") index = kwargs.get("index") message = kwargs.get("message") - matched = kwargs.get("matched") - total_hits = kwargs.get("total_hits", 0) + + meta = kwargs.get("meta", {}) + total_hits = meta.get("total_hits", 0) + matched = meta.get("matched") + if message: # Dump the message in YAML for readability messages_formatted = "" @@ -88,25 +93,22 @@ def format_webhook(**kwargs): rule: The rule object, must be specified index: The index the rule matched on, can be None message: The message to send, can be None, but will be sent as None - matched: The matched fields, can be None, but will be sent as None - total_hits: The total number of matches, optional + meta: + matched: The matched fields, can be None, but will be sent as None + total_hits: The total number of matches, optional notification_settings: The notification settings, must be specified priority: The priority of the message, optional topic: The topic of the message, optional """ - rule = kwargs.get("rule") - index = kwargs.get("index") + # rule = kwargs.get("rule") + # index = kwargs.get("index") message = kwargs.get("message") - matched = kwargs.get("matched") - total_hits = kwargs.get("total_hits", 0) + meta = kwargs.get("meta") + notification_settings = kwargs.get("notification_settings") notify_message = { - "rule_id": rule.id, - "rule_name": rule.name, - "matched": matched, - "total_hits": total_hits, - "index": index, "data": message, + "meta": meta, } if "priority" in notification_settings: notify_message["priority"] = notification_settings["priority"] @@ -144,20 +146,23 @@ def rule_notify(rule, index, message, meta=None): # Don't send anything return + # double sigh + message_copy = deepcopy(message) + for index, _ in enumerate(message_copy): + if "meta" in message_copy[index]: + del message_copy[index]["meta"] + # Create a cast we can reuse for the formatting helpers and sendmsg cast = { "title": title, "user": rule.user, "rule": rule, "index": index, - "message": message, + "message": message_copy, "notification_settings": notification_settings, } if meta: - if "matched" in meta: - cast["matched"] = meta["matched"] - if "total_hits" in meta: - cast["total_hits"] = meta["total_hits"] + cast["meta"] = meta if rule.service == "ntfy": cast["msg"] = format_ntfy(**cast) @@ -341,8 +346,8 @@ class NotificationRuleData(object): :param index: the index to store the matches for :param matches: the matches to store """ - new_matches = self.reform_matches(index, matches, meta, mode) - await self.db.async_store_matches(new_matches) + # new_matches = self.reform_matches(index, matches, meta, mode) + await self.db.async_store_matches(matches) def ingest_matches_sync(self, index, matches, meta, mode): """ @@ -350,8 +355,8 @@ class NotificationRuleData(object): :param index: the index to store the matches for :param matches: the matches to store """ - new_matches = self.reform_matches(index, matches, meta, mode) - self.db.store_matches(new_matches) + # new_matches = self.reform_matches(index, matches, meta, mode) + self.db.store_matches(matches) async def rule_matched(self, index, message, meta, mode): """ @@ -386,8 +391,11 @@ class NotificationRuleData(object): if aggs_formatted: meta["matched_aggs"] = aggs_formatted - rule_notify(self.object, index, message, meta) + meta["is_match"] = True self.store_match(index, message) + + message = self.reform_matches(index, message, meta, mode) + rule_notify(self.object, index, message, meta) await self.ingest_matches(index, message, meta, mode) def rule_matched_sync(self, index, message, meta, mode): @@ -423,12 +431,15 @@ class NotificationRuleData(object): if aggs_formatted: meta["matched_aggs"] = aggs_formatted - rule_notify(self.object, index, message, meta) + meta["is_match"] = True self.store_match(index, message) + + message = self.reform_matches(index, message, meta, mode) + rule_notify(self.object, index, message, meta) self.ingest_matches_sync(index, message, meta, mode) # No async helper for this one as we only need it for schedules - async def rule_no_match(self, index=None, message=None): + async def rule_no_match(self, index=None, message=None, mode=None): """ A rule has not matched. If the previous run did match, send a notification if configured to notify @@ -455,11 +466,14 @@ class NotificationRuleData(object): if self.policy in ["always", "change"]: # Never notify for empty matches on default policy - rule_notify(self.object, index, "no_match", None) + meta = {"msg": message, "is_match": False} + matches = [{"msg": None}] + message = self.reform_matches(index, matches, meta, mode) + rule_notify(self.object, index, matches, meta) await self.ingest_matches( index=index, - matches=[{"msg": None}], - meta={"msg": message}, + matches=matches, + meta=meta, mode="schedule", ) @@ -472,12 +486,16 @@ class NotificationRuleData(object): response = await self.db.schedule_query_results(self) if not response: # No results in the result_map - await self.rule_no_match(message="No response from database") + await self.rule_no_match( + message="No response from database", mode="schedule" + ) return for index, (meta, results) in response.items(): if not results: # Falsy results, no matches - await self.rule_no_match(index, message="No results for index") + await self.rule_no_match( + index, message="No results for index", mode="schedule" + ) continue # Add the match values of all aggregations to a list @@ -496,7 +514,9 @@ class NotificationRuleData(object): ) continue # Default branch, since the happy path has a continue keyword - await self.rule_no_match(index, message="Aggregation did not match") + await self.rule_no_match( + index, message="Aggregation did not match", mode="schedule" + ) def test_schedule(self): """