2023-01-12 07:20:48 +00:00
|
|
|
from yaml import dump, load
|
2023-01-12 07:20:43 +00:00
|
|
|
from yaml.parser import ParserError
|
2023-01-12 07:20:48 +00:00
|
|
|
from yaml.scanner import ScannerError
|
|
|
|
|
2023-01-12 07:20:43 +00:00
|
|
|
try:
|
2023-01-12 07:20:48 +00:00
|
|
|
from yaml import CDumper as Dumper
|
|
|
|
from yaml import CLoader as Loader
|
2023-01-12 07:20:43 +00:00
|
|
|
except ImportError:
|
|
|
|
from yaml import Loader, Dumper
|
|
|
|
|
2023-01-15 23:02:13 +00:00
|
|
|
import orjson
|
2023-01-15 17:59:12 +00:00
|
|
|
from asgiref.sync import async_to_sync
|
|
|
|
|
2023-01-12 07:20:48 +00:00
|
|
|
from core.lib.notify import sendmsg
|
2023-01-15 17:59:12 +00:00
|
|
|
from core.lib.parsing import parse_index, parse_source
|
2023-01-12 07:20:48 +00:00
|
|
|
from core.util import logs
|
|
|
|
|
|
|
|
log = logs.get_logger("rules")
|
|
|
|
|
2023-01-14 16:36:22 +00:00
|
|
|
SECONDS_PER_UNIT = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800}
|
|
|
|
|
|
|
|
MAX_WINDOW = 2592000
|
|
|
|
|
2023-01-12 07:20:48 +00:00
|
|
|
|
2023-01-14 14:45:19 +00:00
|
|
|
class RuleParseError(Exception):
|
|
|
|
def __init__(self, message, field):
|
|
|
|
super().__init__(message)
|
|
|
|
self.field = field
|
|
|
|
|
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
def format_ntfy(**kwargs):
|
|
|
|
"""
|
|
|
|
Format a message for ntfy.
|
|
|
|
"""
|
|
|
|
rule = kwargs.get("rule")
|
|
|
|
index = kwargs.get("index")
|
|
|
|
message = kwargs.get("message")
|
|
|
|
matched = kwargs.get("matched")
|
|
|
|
if message:
|
2023-01-15 23:02:13 +00:00
|
|
|
# Dump the message in YAML for readability
|
|
|
|
messages_formatted = ""
|
|
|
|
if isinstance(message, list):
|
|
|
|
for message in message:
|
|
|
|
messages_formatted += dump(
|
|
|
|
message, Dumper=Dumper, default_flow_style=False
|
|
|
|
)
|
|
|
|
messages_formatted += "\n"
|
|
|
|
else:
|
|
|
|
messages_formatted = dump(message, Dumper=Dumper, default_flow_style=False)
|
2023-01-16 00:10:41 +00:00
|
|
|
else:
|
|
|
|
messages_formatted = ""
|
|
|
|
|
|
|
|
if matched:
|
2023-01-15 23:02:13 +00:00
|
|
|
matched = ", ".join([f"{k}: {v}" for k, v in matched.items()])
|
2023-01-16 00:10:41 +00:00
|
|
|
else:
|
|
|
|
matched = ""
|
|
|
|
|
|
|
|
notify_message = f"{rule.name} on {index}: {matched}\n{messages_formatted}"
|
|
|
|
notify_message = notify_message.encode("utf-8", "replace")
|
|
|
|
|
|
|
|
return notify_message
|
|
|
|
|
|
|
|
|
|
|
|
def format_webhook(**kwargs):
|
|
|
|
"""
|
|
|
|
Format a message for a webhook.
|
|
|
|
"""
|
|
|
|
rule = kwargs.get("rule")
|
|
|
|
index = kwargs.get("index")
|
|
|
|
message = kwargs.get("message")
|
|
|
|
matched = kwargs.get("matched")
|
|
|
|
notification_settings = kwargs.get("notification_settings")
|
|
|
|
notify_message = {
|
|
|
|
"rule_id": rule.id,
|
|
|
|
"rule_name": rule.name,
|
|
|
|
"match": matched,
|
|
|
|
"index": index,
|
|
|
|
"data": message,
|
|
|
|
}
|
|
|
|
if "priority" in notification_settings:
|
|
|
|
notify_message["priority"] = notification_settings["priority"]
|
|
|
|
if "topic" in notification_settings:
|
|
|
|
notify_message["topic"] = notification_settings["topic"]
|
|
|
|
notify_message = orjson.dumps(notify_message)
|
|
|
|
|
|
|
|
return notify_message
|
2023-01-15 23:02:13 +00:00
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
|
|
|
|
def rule_notify(rule, index, message, matched):
|
|
|
|
if message:
|
|
|
|
word = "match"
|
|
|
|
else:
|
|
|
|
word = "no match"
|
|
|
|
title = f"Rule {rule.name} {word} on {index}"
|
|
|
|
notification_settings = rule.get_notification_settings()
|
|
|
|
if not notification_settings:
|
|
|
|
return
|
|
|
|
cast = {
|
|
|
|
"title": title,
|
|
|
|
"user": rule.user,
|
|
|
|
"rule": rule,
|
|
|
|
"index": index,
|
|
|
|
"message": message,
|
|
|
|
"matched": matched,
|
|
|
|
"notification_settings": notification_settings,
|
|
|
|
}
|
|
|
|
if rule.service == "ntfy":
|
|
|
|
cast["msg"] = format_ntfy(**cast)
|
2023-01-15 23:02:13 +00:00
|
|
|
|
|
|
|
elif rule.service == "webhook":
|
2023-01-16 00:10:41 +00:00
|
|
|
cast["msg"] = format_webhook(**cast)
|
2023-01-15 23:02:13 +00:00
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
sendmsg(**cast)
|
2023-01-12 07:20:48 +00:00
|
|
|
|
|
|
|
|
2023-01-12 07:20:43 +00:00
|
|
|
class NotificationRuleData(object):
|
2023-01-15 17:59:12 +00:00
|
|
|
def __init__(self, user, cleaned_data, db):
|
2023-01-12 07:20:43 +00:00
|
|
|
self.user = user
|
2023-01-15 17:59:12 +00:00
|
|
|
self.object = None
|
|
|
|
|
|
|
|
# We are running live
|
|
|
|
if not isinstance(cleaned_data, dict):
|
|
|
|
self.object = cleaned_data
|
|
|
|
cleaned_data = cleaned_data.__dict__
|
|
|
|
|
2023-01-14 14:45:19 +00:00
|
|
|
self.cleaned_data = cleaned_data
|
2023-01-15 17:59:12 +00:00
|
|
|
self.db = db
|
2023-01-14 14:45:19 +00:00
|
|
|
self.data = self.cleaned_data.get("data")
|
2023-01-16 01:17:19 +00:00
|
|
|
self.window = self.cleaned_data.get("window")
|
2023-01-12 07:20:43 +00:00
|
|
|
self.parsed = None
|
2023-01-15 17:59:12 +00:00
|
|
|
self.aggs = {}
|
2023-01-12 07:20:43 +00:00
|
|
|
|
2023-01-14 16:36:22 +00:00
|
|
|
self.validate_user_permissions()
|
|
|
|
|
2023-01-12 07:20:43 +00:00
|
|
|
self.parse_data()
|
2023-01-15 17:59:12 +00:00
|
|
|
self.ensure_list()
|
2023-01-12 07:20:43 +00:00
|
|
|
self.validate_permissions()
|
2023-01-15 17:59:12 +00:00
|
|
|
self.validate_schedule_fields()
|
2023-01-14 14:45:19 +00:00
|
|
|
self.validate_time_fields()
|
2023-01-16 00:29:54 +00:00
|
|
|
if self.object is not None:
|
|
|
|
self.populate_matched()
|
|
|
|
|
|
|
|
def populate_matched(self):
|
|
|
|
if self.object.match is None:
|
|
|
|
self.object.match = {}
|
|
|
|
for index in self.parsed["index"]:
|
|
|
|
if index not in self.object.match:
|
|
|
|
self.object.match[index] = False
|
|
|
|
self.object.save()
|
2023-01-14 14:45:19 +00:00
|
|
|
|
2023-01-15 17:59:12 +00:00
|
|
|
def store_match(self, index, match):
|
|
|
|
"""
|
|
|
|
Store a match result.
|
|
|
|
"""
|
|
|
|
if self.object.match is None:
|
|
|
|
self.object.match = {}
|
|
|
|
if not isinstance(self.object.match, dict):
|
|
|
|
self.object.match = {}
|
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
if index is None:
|
|
|
|
for index_iter in self.parsed["index"]:
|
|
|
|
self.object.match[index_iter] = match
|
|
|
|
else:
|
|
|
|
self.object.match[index] = match
|
2023-01-15 17:59:12 +00:00
|
|
|
self.object.save()
|
|
|
|
log.debug(f"Stored match: {index} - {match}")
|
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
def get_match(self, index=None):
|
2023-01-15 18:40:17 +00:00
|
|
|
"""
|
|
|
|
Get a match result for an index.
|
|
|
|
"""
|
|
|
|
if self.object.match is None:
|
2023-01-16 00:20:40 +00:00
|
|
|
self.object.match = {}
|
|
|
|
self.object.save()
|
2023-01-15 18:40:17 +00:00
|
|
|
return None
|
|
|
|
if not isinstance(self.object.match, dict):
|
|
|
|
return None
|
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
if index is None:
|
|
|
|
# Check if we have any matches on all indices
|
|
|
|
return any(self.object.match.values())
|
|
|
|
|
2023-01-15 18:40:17 +00:00
|
|
|
return self.object.match.get(index)
|
|
|
|
|
|
|
|
def format_aggs(self, aggs):
|
|
|
|
"""
|
|
|
|
Format aggregations for the query.
|
|
|
|
We have self.aggs, which contains:
|
|
|
|
{"avg_sentiment": (">", 0.5)}
|
|
|
|
and aggs, which contains:
|
|
|
|
{"avg_sentiment": {"value": 0.6}}
|
|
|
|
It's matched already, we just need to format it like so:
|
|
|
|
{"avg_sentiment": "0.06>0.5"}
|
|
|
|
"""
|
|
|
|
new_aggs = {}
|
|
|
|
for agg_name, agg in aggs.items():
|
|
|
|
# Already checked membership below
|
|
|
|
op, value = self.aggs[agg_name]
|
|
|
|
new_aggs[agg_name] = f"{agg['value']}{op}{value}"
|
|
|
|
|
|
|
|
return new_aggs
|
|
|
|
|
2023-01-16 00:10:41 +00:00
|
|
|
def rule_matched(self, index, message, aggs):
|
|
|
|
"""
|
|
|
|
A rule has matched.
|
|
|
|
"""
|
|
|
|
current_match = self.get_match(index)
|
2023-01-16 07:20:37 +00:00
|
|
|
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
2023-01-16 00:10:41 +00:00
|
|
|
if current_match is False:
|
|
|
|
# Matched now, but not before
|
|
|
|
formatted_aggs = self.format_aggs(aggs)
|
|
|
|
rule_notify(self.object, index, message, formatted_aggs)
|
|
|
|
self.store_match(index, True)
|
|
|
|
|
|
|
|
def rule_no_match(self, index=None):
|
|
|
|
"""
|
|
|
|
A rule has not matched.
|
|
|
|
"""
|
|
|
|
current_match = self.get_match(index)
|
2023-01-16 07:20:37 +00:00
|
|
|
log.debug(f"Rule not matched: {index} - current match: {current_match}")
|
2023-01-16 00:10:41 +00:00
|
|
|
if current_match is True:
|
|
|
|
# Matched before, but not now
|
|
|
|
if self.object.send_empty:
|
|
|
|
rule_notify(self.object, index, "no_match", None)
|
|
|
|
self.store_match(index, False)
|
|
|
|
|
2023-01-15 17:59:12 +00:00
|
|
|
async def run_schedule(self):
|
|
|
|
"""
|
|
|
|
Run the schedule query.
|
|
|
|
"""
|
2023-01-15 18:40:17 +00:00
|
|
|
response = await self.db.schedule_query_results(self)
|
2023-01-16 00:10:41 +00:00
|
|
|
if not response:
|
|
|
|
self.rule_no_match()
|
2023-01-15 18:40:17 +00:00
|
|
|
for index, (aggs, results) in response.items():
|
|
|
|
if not results:
|
2023-01-16 00:10:41 +00:00
|
|
|
self.rule_not_matched(index)
|
2023-01-15 17:59:12 +00:00
|
|
|
|
2023-01-15 18:40:17 +00:00
|
|
|
aggs_for_index = []
|
|
|
|
for agg_name in self.aggs.keys():
|
|
|
|
if agg_name in aggs:
|
|
|
|
if "match" in aggs[agg_name]:
|
|
|
|
aggs_for_index.append(aggs[agg_name]["match"])
|
|
|
|
|
|
|
|
# All required aggs are present
|
|
|
|
if len(aggs_for_index) == len(self.aggs.keys()):
|
|
|
|
if all(aggs_for_index):
|
|
|
|
# Ensure we only send notifications when the previous run
|
|
|
|
# did not return any matches
|
2023-01-16 00:10:41 +00:00
|
|
|
self.rule_matched(index, results[: self.object.amount], aggs)
|
2023-01-15 18:40:17 +00:00
|
|
|
continue
|
2023-01-16 00:10:41 +00:00
|
|
|
self.rule_not_matched(index)
|
2023-01-15 18:40:17 +00:00
|
|
|
|
2023-01-15 17:59:12 +00:00
|
|
|
def test_schedule(self):
|
|
|
|
"""
|
|
|
|
Test the schedule query to ensure it is valid.
|
|
|
|
"""
|
|
|
|
if self.db:
|
|
|
|
sync_schedule = async_to_sync(self.db.schedule_query_results)
|
|
|
|
sync_schedule(self)
|
|
|
|
|
|
|
|
def validate_schedule_fields(self):
|
|
|
|
"""
|
|
|
|
Ensure schedule fields are valid.
|
|
|
|
index: can be a list, it will schedule one search per index.
|
|
|
|
source: can be a list, it will be the filter for each search.
|
|
|
|
tokens: can be list, it will ensure the message matches any token.
|
|
|
|
msg: can be a list, it will ensure the message contains any msg.
|
|
|
|
No other fields can be lists containing more than one item.
|
|
|
|
"""
|
|
|
|
is_schedule = self.is_schedule
|
|
|
|
|
|
|
|
if is_schedule:
|
|
|
|
allowed_list_fields = ["index", "source", "tokens", "msg"]
|
|
|
|
for field, value in self.parsed.items():
|
|
|
|
if field not in allowed_list_fields:
|
|
|
|
if len(value) > 1:
|
|
|
|
raise RuleParseError(
|
|
|
|
(
|
|
|
|
f"For scheduled rules, field {field} cannot contain "
|
|
|
|
"more than one item"
|
|
|
|
),
|
|
|
|
"data",
|
|
|
|
)
|
|
|
|
if len(str(value[0])) == 0:
|
|
|
|
raise RuleParseError(f"Field {field} cannot be empty", "data")
|
|
|
|
if "sentiment" in self.parsed:
|
|
|
|
sentiment = str(self.parsed["sentiment"][0])
|
|
|
|
sentiment = sentiment.strip()
|
|
|
|
if sentiment[0] not in [">", "<", "="]:
|
|
|
|
raise RuleParseError(
|
|
|
|
(
|
|
|
|
"Sentiment field must be a comparison operator and then a "
|
|
|
|
"float: >0.02"
|
|
|
|
),
|
|
|
|
"data",
|
|
|
|
)
|
|
|
|
operator = sentiment[0]
|
|
|
|
number = sentiment[1:]
|
|
|
|
|
|
|
|
try:
|
|
|
|
number = float(number)
|
|
|
|
except ValueError:
|
|
|
|
raise RuleParseError(
|
|
|
|
(
|
|
|
|
"Sentiment field must be a comparison operator and then a "
|
|
|
|
"float: >0.02"
|
|
|
|
),
|
|
|
|
"data",
|
|
|
|
)
|
|
|
|
self.aggs["avg_sentiment"] = (operator, number)
|
|
|
|
|
|
|
|
else:
|
|
|
|
if "query" in self.parsed:
|
|
|
|
raise RuleParseError(
|
|
|
|
"Field query cannot be used with on-demand rules", "data"
|
|
|
|
)
|
|
|
|
if "tags" in self.parsed:
|
|
|
|
raise RuleParseError(
|
|
|
|
"Field tags cannot be used with on-demand rules", "data"
|
|
|
|
)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def is_schedule(self):
|
|
|
|
if "interval" in self.cleaned_data:
|
|
|
|
if self.cleaned_data["interval"] != 0:
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def ensure_list(self):
|
|
|
|
"""
|
|
|
|
Ensure all values are lists.
|
|
|
|
"""
|
|
|
|
for field, value in self.parsed.items():
|
|
|
|
if not isinstance(value, list):
|
|
|
|
self.parsed[field] = [value]
|
|
|
|
|
2023-01-14 16:36:22 +00:00
|
|
|
def validate_user_permissions(self):
|
|
|
|
"""
|
|
|
|
Ensure the user can use notification rules.
|
|
|
|
"""
|
|
|
|
if not self.user.has_perm("core.use_rules"):
|
|
|
|
raise RuleParseError("User does not have permission to use rules", "data")
|
|
|
|
|
2023-01-14 14:45:19 +00:00
|
|
|
def validate_time_fields(self):
|
|
|
|
"""
|
|
|
|
Validate the interval and window fields.
|
|
|
|
Prohibit window being specified with an ondemand interval.
|
|
|
|
"""
|
|
|
|
interval = self.cleaned_data.get("interval")
|
|
|
|
window = self.cleaned_data.get("window")
|
2023-01-15 23:02:13 +00:00
|
|
|
amount = self.cleaned_data.get("amount")
|
|
|
|
|
|
|
|
on_demand = interval == 0
|
|
|
|
if on_demand and window is not None:
|
|
|
|
# Interval is on demand and window is specified
|
|
|
|
# We can't have a window with on-demand rules
|
2023-01-14 14:45:19 +00:00
|
|
|
raise RuleParseError(
|
2023-01-14 16:36:22 +00:00
|
|
|
"Window cannot be specified with on-demand interval", "window"
|
2023-01-14 14:45:19 +00:00
|
|
|
)
|
2023-01-12 07:20:43 +00:00
|
|
|
|
2023-01-15 23:02:13 +00:00
|
|
|
if not on_demand and window is None:
|
|
|
|
# Interval is not on demand and window is not specified
|
|
|
|
# We can't have a non-on-demand interval without a window
|
2023-01-14 16:36:22 +00:00
|
|
|
raise RuleParseError(
|
|
|
|
"Window must be specified with non-on-demand interval", "window"
|
|
|
|
)
|
|
|
|
|
2023-01-15 23:02:13 +00:00
|
|
|
if not on_demand and amount is None:
|
|
|
|
# Interval is not on demand and amount is not specified
|
|
|
|
# We can't have a non-on-demand interval without an amount
|
|
|
|
raise RuleParseError(
|
|
|
|
"Amount must be specified with non-on-demand interval", "amount"
|
|
|
|
)
|
|
|
|
if on_demand and amount is not None:
|
|
|
|
# Interval is on demand and amount is specified
|
|
|
|
# We can't have an amount with on-demand rules
|
|
|
|
raise RuleParseError(
|
|
|
|
"Amount cannot be specified with on-demand interval", "amount"
|
|
|
|
)
|
|
|
|
|
2023-01-14 16:36:22 +00:00
|
|
|
if window is not None:
|
|
|
|
window_number = window[:-1]
|
|
|
|
if not window_number.isdigit():
|
|
|
|
raise RuleParseError("Window prefix must be a number", "window")
|
|
|
|
window_number = int(window_number)
|
|
|
|
window_unit = window[-1]
|
|
|
|
if window_unit not in SECONDS_PER_UNIT:
|
|
|
|
raise RuleParseError(
|
2023-01-14 17:24:54 +00:00
|
|
|
(
|
|
|
|
"Window unit must be one of "
|
|
|
|
f"{', '.join(SECONDS_PER_UNIT.keys())},"
|
|
|
|
f" not '{window_unit}'"
|
|
|
|
),
|
2023-01-14 16:36:22 +00:00
|
|
|
"window",
|
|
|
|
)
|
|
|
|
window_seconds = window_number * SECONDS_PER_UNIT[window_unit]
|
|
|
|
if window_seconds > MAX_WINDOW:
|
|
|
|
raise RuleParseError(
|
|
|
|
f"Window cannot be larger than {MAX_WINDOW} seconds (30 days)",
|
|
|
|
"window",
|
|
|
|
)
|
|
|
|
|
2023-01-12 07:20:43 +00:00
|
|
|
def validate_permissions(self):
|
|
|
|
"""
|
|
|
|
Validate permissions for the source and index variables.
|
|
|
|
"""
|
|
|
|
if "index" in self.parsed:
|
|
|
|
index = self.parsed["index"]
|
|
|
|
if type(index) == list:
|
|
|
|
for i in index:
|
2023-01-15 17:59:12 +00:00
|
|
|
parse_index(self.user, {"index": i}, raise_error=True)
|
|
|
|
# else:
|
|
|
|
# db.parse_index(self.user, {"index": index}, raise_error=True)
|
2023-01-12 07:20:43 +00:00
|
|
|
else:
|
|
|
|
# Get the default value for the user if not present
|
2023-01-15 17:59:12 +00:00
|
|
|
index = parse_index(self.user, {}, raise_error=True)
|
2023-01-16 00:23:23 +00:00
|
|
|
self.parsed["index"] = [index]
|
2023-01-12 07:20:43 +00:00
|
|
|
|
|
|
|
if "source" in self.parsed:
|
|
|
|
source = self.parsed["source"]
|
|
|
|
if type(source) == list:
|
|
|
|
for i in source:
|
2023-01-15 17:59:12 +00:00
|
|
|
parse_source(self.user, {"source": i}, raise_error=True)
|
|
|
|
# else:
|
|
|
|
# parse_source(self.user, {"source": source}, raise_error=True)
|
2023-01-12 07:20:43 +00:00
|
|
|
else:
|
|
|
|
# Get the default value for the user if not present
|
2023-01-15 17:59:12 +00:00
|
|
|
source = parse_source(self.user, {}, raise_error=True)
|
2023-01-16 00:23:23 +00:00
|
|
|
self.parsed["source"] = [source]
|
2023-01-12 07:20:43 +00:00
|
|
|
|
|
|
|
def parse_data(self):
|
|
|
|
"""
|
|
|
|
Parse the data in the text field to YAML.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
self.parsed = load(self.data, Loader=Loader)
|
|
|
|
except (ScannerError, ParserError) as e:
|
2023-01-14 14:45:19 +00:00
|
|
|
raise RuleParseError("data", f"Invalid YAML: {e}")
|
2023-01-12 07:20:43 +00:00
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return dump(self.parsed, Dumper=Dumper)
|
|
|
|
|
|
|
|
def get_data(self):
|
2023-01-12 07:20:48 +00:00
|
|
|
return self.parsed
|