Fix notification delivery
This commit is contained in:
parent
df273a6009
commit
87c232d3f9
|
@ -350,8 +350,8 @@ class ElasticsearchBackend(StorageBackend):
|
|||
range_query = {
|
||||
"range": {
|
||||
"ts": {
|
||||
"gte": f"now-{rule_object.window}/d",
|
||||
"lte": "now/d",
|
||||
"gte": f"now-{rule_object.window}",
|
||||
"lte": "now",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,9 +56,11 @@ def webhook_sendmsg(**kwargs):
|
|||
msg = kwargs.get("msg", None)
|
||||
notification_settings = kwargs.get("notification_settings")
|
||||
url = notification_settings.get("url")
|
||||
headers = {"Content-type": "application/json"}
|
||||
try:
|
||||
requests.post(
|
||||
f"{url}",
|
||||
headers=headers,
|
||||
data=msg,
|
||||
)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
|
|
|
@ -9,6 +9,7 @@ except ImportError:
|
|||
from yaml import Loader, Dumper
|
||||
|
||||
import uuid
|
||||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
|
||||
import orjson
|
||||
|
@ -44,14 +45,18 @@ def format_ntfy(**kwargs):
|
|||
rule: The rule object, must be specified
|
||||
index: The index the rule matched on, can be None
|
||||
message: The message to send, can be None
|
||||
meta:
|
||||
matched: The matched fields, can be None
|
||||
total_hits: The total number of matches, optional
|
||||
"""
|
||||
rule = kwargs.get("rule")
|
||||
index = kwargs.get("index")
|
||||
message = kwargs.get("message")
|
||||
matched = kwargs.get("matched")
|
||||
total_hits = kwargs.get("total_hits", 0)
|
||||
|
||||
meta = kwargs.get("meta", {})
|
||||
total_hits = meta.get("total_hits", 0)
|
||||
matched = meta.get("matched")
|
||||
|
||||
if message:
|
||||
# Dump the message in YAML for readability
|
||||
messages_formatted = ""
|
||||
|
@ -88,25 +93,22 @@ def format_webhook(**kwargs):
|
|||
rule: The rule object, must be specified
|
||||
index: The index the rule matched on, can be None
|
||||
message: The message to send, can be None, but will be sent as None
|
||||
meta:
|
||||
matched: The matched fields, can be None, but will be sent as None
|
||||
total_hits: The total number of matches, optional
|
||||
notification_settings: The notification settings, must be specified
|
||||
priority: The priority of the message, optional
|
||||
topic: The topic of the message, optional
|
||||
"""
|
||||
rule = kwargs.get("rule")
|
||||
index = kwargs.get("index")
|
||||
# rule = kwargs.get("rule")
|
||||
# index = kwargs.get("index")
|
||||
message = kwargs.get("message")
|
||||
matched = kwargs.get("matched")
|
||||
total_hits = kwargs.get("total_hits", 0)
|
||||
meta = kwargs.get("meta")
|
||||
|
||||
notification_settings = kwargs.get("notification_settings")
|
||||
notify_message = {
|
||||
"rule_id": rule.id,
|
||||
"rule_name": rule.name,
|
||||
"matched": matched,
|
||||
"total_hits": total_hits,
|
||||
"index": index,
|
||||
"data": message,
|
||||
"meta": meta,
|
||||
}
|
||||
if "priority" in notification_settings:
|
||||
notify_message["priority"] = notification_settings["priority"]
|
||||
|
@ -144,20 +146,23 @@ def rule_notify(rule, index, message, meta=None):
|
|||
# Don't send anything
|
||||
return
|
||||
|
||||
# double sigh
|
||||
message_copy = deepcopy(message)
|
||||
for index, _ in enumerate(message_copy):
|
||||
if "meta" in message_copy[index]:
|
||||
del message_copy[index]["meta"]
|
||||
|
||||
# Create a cast we can reuse for the formatting helpers and sendmsg
|
||||
cast = {
|
||||
"title": title,
|
||||
"user": rule.user,
|
||||
"rule": rule,
|
||||
"index": index,
|
||||
"message": message,
|
||||
"message": message_copy,
|
||||
"notification_settings": notification_settings,
|
||||
}
|
||||
if meta:
|
||||
if "matched" in meta:
|
||||
cast["matched"] = meta["matched"]
|
||||
if "total_hits" in meta:
|
||||
cast["total_hits"] = meta["total_hits"]
|
||||
cast["meta"] = meta
|
||||
|
||||
if rule.service == "ntfy":
|
||||
cast["msg"] = format_ntfy(**cast)
|
||||
|
@ -341,8 +346,8 @@ class NotificationRuleData(object):
|
|||
:param index: the index to store the matches for
|
||||
:param matches: the matches to store
|
||||
"""
|
||||
new_matches = self.reform_matches(index, matches, meta, mode)
|
||||
await self.db.async_store_matches(new_matches)
|
||||
# new_matches = self.reform_matches(index, matches, meta, mode)
|
||||
await self.db.async_store_matches(matches)
|
||||
|
||||
def ingest_matches_sync(self, index, matches, meta, mode):
|
||||
"""
|
||||
|
@ -350,8 +355,8 @@ class NotificationRuleData(object):
|
|||
:param index: the index to store the matches for
|
||||
:param matches: the matches to store
|
||||
"""
|
||||
new_matches = self.reform_matches(index, matches, meta, mode)
|
||||
self.db.store_matches(new_matches)
|
||||
# new_matches = self.reform_matches(index, matches, meta, mode)
|
||||
self.db.store_matches(matches)
|
||||
|
||||
async def rule_matched(self, index, message, meta, mode):
|
||||
"""
|
||||
|
@ -386,8 +391,11 @@ class NotificationRuleData(object):
|
|||
if aggs_formatted:
|
||||
meta["matched_aggs"] = aggs_formatted
|
||||
|
||||
rule_notify(self.object, index, message, meta)
|
||||
meta["is_match"] = True
|
||||
self.store_match(index, message)
|
||||
|
||||
message = self.reform_matches(index, message, meta, mode)
|
||||
rule_notify(self.object, index, message, meta)
|
||||
await self.ingest_matches(index, message, meta, mode)
|
||||
|
||||
def rule_matched_sync(self, index, message, meta, mode):
|
||||
|
@ -423,12 +431,15 @@ class NotificationRuleData(object):
|
|||
if aggs_formatted:
|
||||
meta["matched_aggs"] = aggs_formatted
|
||||
|
||||
rule_notify(self.object, index, message, meta)
|
||||
meta["is_match"] = True
|
||||
self.store_match(index, message)
|
||||
|
||||
message = self.reform_matches(index, message, meta, mode)
|
||||
rule_notify(self.object, index, message, meta)
|
||||
self.ingest_matches_sync(index, message, meta, mode)
|
||||
|
||||
# No async helper for this one as we only need it for schedules
|
||||
async def rule_no_match(self, index=None, message=None):
|
||||
async def rule_no_match(self, index=None, message=None, mode=None):
|
||||
"""
|
||||
A rule has not matched.
|
||||
If the previous run did match, send a notification if configured to notify
|
||||
|
@ -455,11 +466,14 @@ class NotificationRuleData(object):
|
|||
|
||||
if self.policy in ["always", "change"]:
|
||||
# Never notify for empty matches on default policy
|
||||
rule_notify(self.object, index, "no_match", None)
|
||||
meta = {"msg": message, "is_match": False}
|
||||
matches = [{"msg": None}]
|
||||
message = self.reform_matches(index, matches, meta, mode)
|
||||
rule_notify(self.object, index, matches, meta)
|
||||
await self.ingest_matches(
|
||||
index=index,
|
||||
matches=[{"msg": None}],
|
||||
meta={"msg": message},
|
||||
matches=matches,
|
||||
meta=meta,
|
||||
mode="schedule",
|
||||
)
|
||||
|
||||
|
@ -472,12 +486,16 @@ class NotificationRuleData(object):
|
|||
response = await self.db.schedule_query_results(self)
|
||||
if not response:
|
||||
# No results in the result_map
|
||||
await self.rule_no_match(message="No response from database")
|
||||
await self.rule_no_match(
|
||||
message="No response from database", mode="schedule"
|
||||
)
|
||||
return
|
||||
for index, (meta, results) in response.items():
|
||||
if not results:
|
||||
# Falsy results, no matches
|
||||
await self.rule_no_match(index, message="No results for index")
|
||||
await self.rule_no_match(
|
||||
index, message="No results for index", mode="schedule"
|
||||
)
|
||||
continue
|
||||
|
||||
# Add the match values of all aggregations to a list
|
||||
|
@ -496,7 +514,9 @@ class NotificationRuleData(object):
|
|||
)
|
||||
continue
|
||||
# Default branch, since the happy path has a continue keyword
|
||||
await self.rule_no_match(index, message="Aggregation did not match")
|
||||
await self.rule_no_match(
|
||||
index, message="Aggregation did not match", mode="schedule"
|
||||
)
|
||||
|
||||
def test_schedule(self):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue