Compare commits
No commits in common. "3d6c8d618fb18a53f7830e11c3057be23149303e" and "c08ecc036f40d83f41ad574f577676c6031eb96d" have entirely different histories.
3d6c8d618f
...
c08ecc036f
@ -55,7 +55,7 @@ class StorageBackend(ABC):
|
|||||||
self.log.info(f"Initialising storage backend {name}")
|
self.log.info(f"Initialising storage backend {name}")
|
||||||
|
|
||||||
self.initialise_caching()
|
self.initialise_caching()
|
||||||
# self.initialise()
|
self.initialise()
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def initialise(self, **kwargs):
|
def initialise(self, **kwargs):
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
# from datetime import datetime, timedelta
|
# from datetime import datetime, timedelta
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from elasticsearch import AsyncElasticsearch, Elasticsearch
|
from elasticsearch import Elasticsearch
|
||||||
from elasticsearch.exceptions import NotFoundError, RequestError
|
from elasticsearch.exceptions import NotFoundError, RequestError
|
||||||
|
|
||||||
from core.db import StorageBackend, add_defaults
|
from core.db import StorageBackend, add_defaults
|
||||||
@ -24,12 +24,10 @@ from core.lib.parsing import (
|
|||||||
class ElasticsearchBackend(StorageBackend):
|
class ElasticsearchBackend(StorageBackend):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("Elasticsearch")
|
super().__init__("Elasticsearch")
|
||||||
self.client = None
|
|
||||||
self.async_client = None
|
|
||||||
|
|
||||||
def initialise(self, **kwargs):
|
def initialise(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
Inititialise the Elasticsearch API endpoint.
|
Inititialise the Elastuicsearch API endpoint.
|
||||||
"""
|
"""
|
||||||
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
|
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
|
||||||
client = Elasticsearch(
|
client = Elasticsearch(
|
||||||
@ -37,16 +35,6 @@ class ElasticsearchBackend(StorageBackend):
|
|||||||
)
|
)
|
||||||
self.client = client
|
self.client = client
|
||||||
|
|
||||||
async def async_initialise(self, **kwargs):
|
|
||||||
"""
|
|
||||||
Inititialise the Elasticsearch API endpoint in async mode.
|
|
||||||
"""
|
|
||||||
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
|
|
||||||
client = AsyncElasticsearch(
|
|
||||||
settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False
|
|
||||||
)
|
|
||||||
self.async_client = client
|
|
||||||
|
|
||||||
def construct_context_query(
|
def construct_context_query(
|
||||||
self, index, net, channel, src, num, size, type=None, nicks=None
|
self, index, net, channel, src, num, size, type=None, nicks=None
|
||||||
):
|
):
|
||||||
@ -198,8 +186,6 @@ class ElasticsearchBackend(StorageBackend):
|
|||||||
Accept fields and size, for the fields we want to match and the
|
Accept fields and size, for the fields we want to match and the
|
||||||
number of results to return.
|
number of results to return.
|
||||||
"""
|
"""
|
||||||
if self.client is None:
|
|
||||||
self.initialise()
|
|
||||||
index = kwargs.get("index")
|
index = kwargs.get("index")
|
||||||
try:
|
try:
|
||||||
response = self.client.search(body=search_query, index=index)
|
response = self.client.search(body=search_query, index=index)
|
||||||
@ -219,11 +205,9 @@ class ElasticsearchBackend(StorageBackend):
|
|||||||
Accept fields and size, for the fields we want to match and the
|
Accept fields and size, for the fields we want to match and the
|
||||||
number of results to return.
|
number of results to return.
|
||||||
"""
|
"""
|
||||||
if self.async_client is None:
|
|
||||||
await self.async_initialise()
|
|
||||||
index = kwargs.get("index")
|
index = kwargs.get("index")
|
||||||
try:
|
try:
|
||||||
response = await self.async_client.search(body=search_query, index=index)
|
response = self.client.search(body=search_query, index=index)
|
||||||
except RequestError as err:
|
except RequestError as err:
|
||||||
print("Elasticsearch error", err)
|
print("Elasticsearch error", err)
|
||||||
return err
|
return err
|
||||||
|
@ -9,17 +9,6 @@ log = logs.get_logger(__name__)
|
|||||||
|
|
||||||
# Actual function to send a message to a topic
|
# Actual function to send a message to a topic
|
||||||
def ntfy_sendmsg(**kwargs):
|
def ntfy_sendmsg(**kwargs):
|
||||||
"""
|
|
||||||
Send a message to a topic using NTFY.
|
|
||||||
kwargs:
|
|
||||||
msg: Message to send, must be specified
|
|
||||||
notification_settings: Notification settings, must be specified
|
|
||||||
url: URL to NTFY server, can be None to use default
|
|
||||||
topic: Topic to send message to, must be specified
|
|
||||||
priority: Priority of message, optional
|
|
||||||
title: Title of message, optional
|
|
||||||
tags: Tags to add to message, optional
|
|
||||||
"""
|
|
||||||
msg = kwargs.get("msg", None)
|
msg = kwargs.get("msg", None)
|
||||||
notification_settings = kwargs.get("notification_settings")
|
notification_settings = kwargs.get("notification_settings")
|
||||||
|
|
||||||
@ -47,12 +36,6 @@ def ntfy_sendmsg(**kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def webhook_sendmsg(**kwargs):
|
def webhook_sendmsg(**kwargs):
|
||||||
"""
|
|
||||||
Send a message to a webhook.
|
|
||||||
kwargs:
|
|
||||||
msg: Message to send, must be specified
|
|
||||||
notification_settings: Notification settings, must be specified
|
|
||||||
url: URL to webhook, must be specified"""
|
|
||||||
msg = kwargs.get("msg", None)
|
msg = kwargs.get("msg", None)
|
||||||
notification_settings = kwargs.get("notification_settings")
|
notification_settings = kwargs.get("notification_settings")
|
||||||
url = notification_settings.get("url")
|
url = notification_settings.get("url")
|
||||||
@ -67,26 +50,6 @@ def webhook_sendmsg(**kwargs):
|
|||||||
|
|
||||||
# Sendmsg helper to send a message to a user's notification settings
|
# Sendmsg helper to send a message to a user's notification settings
|
||||||
def sendmsg(**kwargs):
|
def sendmsg(**kwargs):
|
||||||
"""
|
|
||||||
Send a message to a user's notification settings.
|
|
||||||
Fetches the user's default notification settings if not specified.
|
|
||||||
kwargs:
|
|
||||||
user: User to send message to, must be specified
|
|
||||||
notification_settings: Notification settings, optional
|
|
||||||
service: Notification service to use
|
|
||||||
|
|
||||||
kwargs for both services:
|
|
||||||
msg: Message to send, must be specified
|
|
||||||
notification_settings: Notification settings, must be specified
|
|
||||||
url: URL to NTFY server, can be None to use default
|
|
||||||
|
|
||||||
extra kwargs for ntfy:
|
|
||||||
title: Title of message, optional
|
|
||||||
tags: Tags to add to message, optional
|
|
||||||
notification_settings: Notification settings, must be specified
|
|
||||||
topic: Topic to send message to, must be specified
|
|
||||||
priority: Priority of message, optional
|
|
||||||
"""
|
|
||||||
user = kwargs.get("user", None)
|
user = kwargs.get("user", None)
|
||||||
notification_settings = kwargs.get(
|
notification_settings = kwargs.get(
|
||||||
"notification_settings", user.get_notification_settings().__dict__
|
"notification_settings", user.get_notification_settings().__dict__
|
||||||
|
@ -31,14 +31,6 @@ class RuleParseError(Exception):
|
|||||||
def format_ntfy(**kwargs):
|
def format_ntfy(**kwargs):
|
||||||
"""
|
"""
|
||||||
Format a message for ntfy.
|
Format a message for ntfy.
|
||||||
If the message is a list, it will be joined with newlines.
|
|
||||||
If the message is None, it will be replaced with an empty string.
|
|
||||||
If specified, `matched` will be pretty-printed in the first line.
|
|
||||||
kwargs:
|
|
||||||
rule: The rule object, must be specified
|
|
||||||
index: The index the rule matched on, can be None
|
|
||||||
message: The message to send, can be None
|
|
||||||
matched: The matched fields, can be None
|
|
||||||
"""
|
"""
|
||||||
rule = kwargs.get("rule")
|
rule = kwargs.get("rule")
|
||||||
index = kwargs.get("index")
|
index = kwargs.get("index")
|
||||||
@ -48,9 +40,9 @@ def format_ntfy(**kwargs):
|
|||||||
# Dump the message in YAML for readability
|
# Dump the message in YAML for readability
|
||||||
messages_formatted = ""
|
messages_formatted = ""
|
||||||
if isinstance(message, list):
|
if isinstance(message, list):
|
||||||
for message_iter in message:
|
for message in message:
|
||||||
messages_formatted += dump(
|
messages_formatted += dump(
|
||||||
message_iter, Dumper=Dumper, default_flow_style=False
|
message, Dumper=Dumper, default_flow_style=False
|
||||||
)
|
)
|
||||||
messages_formatted += "\n"
|
messages_formatted += "\n"
|
||||||
else:
|
else:
|
||||||
@ -72,17 +64,6 @@ def format_ntfy(**kwargs):
|
|||||||
def format_webhook(**kwargs):
|
def format_webhook(**kwargs):
|
||||||
"""
|
"""
|
||||||
Format a message for a webhook.
|
Format a message for a webhook.
|
||||||
Adds some metadata to the message that would normally be only in
|
|
||||||
notification_settings.
|
|
||||||
Dumps the message in JSON.
|
|
||||||
kwargs:
|
|
||||||
rule: The rule object, must be specified
|
|
||||||
index: The index the rule matched on, can be None
|
|
||||||
message: The message to send, can be None, but will be sent as None
|
|
||||||
matched: The matched fields, can be None, but will be sent as None
|
|
||||||
notification_settings: The notification settings, must be specified
|
|
||||||
priority: The priority of the message, optional
|
|
||||||
topic: The topic of the message, optional
|
|
||||||
"""
|
"""
|
||||||
rule = kwargs.get("rule")
|
rule = kwargs.get("rule")
|
||||||
index = kwargs.get("index")
|
index = kwargs.get("index")
|
||||||
@ -106,30 +87,14 @@ def format_webhook(**kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def rule_notify(rule, index, message, matched):
|
def rule_notify(rule, index, message, matched):
|
||||||
"""
|
|
||||||
Send a notification for a matching rule.
|
|
||||||
Gets the notification settings for the rule.
|
|
||||||
Runs the formatting helpers for the service.
|
|
||||||
:param rule: The rule object, must be specified
|
|
||||||
:param index: The index the rule matched on, can be None
|
|
||||||
:param message: The message to send, can be None
|
|
||||||
:param matched: The matched fields, can be None
|
|
||||||
"""
|
|
||||||
# If there is no message, don't say anything matched
|
|
||||||
if message:
|
if message:
|
||||||
word = "match"
|
word = "match"
|
||||||
else:
|
else:
|
||||||
word = "no match"
|
word = "no match"
|
||||||
|
|
||||||
title = f"Rule {rule.name} {word} on {index}"
|
title = f"Rule {rule.name} {word} on {index}"
|
||||||
|
|
||||||
# The user notification settings are merged in with this
|
|
||||||
notification_settings = rule.get_notification_settings()
|
notification_settings = rule.get_notification_settings()
|
||||||
if not notification_settings:
|
if not notification_settings:
|
||||||
# No/invalid notification settings, don't send anything
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Create a cast we can reuse for the formatting helpers and sendmsg
|
|
||||||
cast = {
|
cast = {
|
||||||
"title": title,
|
"title": title,
|
||||||
"user": rule.user,
|
"user": rule.user,
|
||||||
@ -139,7 +104,6 @@ def rule_notify(rule, index, message, matched):
|
|||||||
"matched": matched,
|
"matched": matched,
|
||||||
"notification_settings": notification_settings,
|
"notification_settings": notification_settings,
|
||||||
}
|
}
|
||||||
|
|
||||||
if rule.service == "ntfy":
|
if rule.service == "ntfy":
|
||||||
cast["msg"] = format_ntfy(**cast)
|
cast["msg"] = format_ntfy(**cast)
|
||||||
|
|
||||||
@ -154,7 +118,7 @@ class NotificationRuleData(object):
|
|||||||
self.user = user
|
self.user = user
|
||||||
self.object = None
|
self.object = None
|
||||||
|
|
||||||
# We are running live and have been passed a database object
|
# We are running live
|
||||||
if not isinstance(cleaned_data, dict):
|
if not isinstance(cleaned_data, dict):
|
||||||
self.object = cleaned_data
|
self.object = cleaned_data
|
||||||
cleaned_data = cleaned_data.__dict__
|
cleaned_data = cleaned_data.__dict__
|
||||||
@ -177,10 +141,6 @@ class NotificationRuleData(object):
|
|||||||
self.populate_matched()
|
self.populate_matched()
|
||||||
|
|
||||||
def populate_matched(self):
|
def populate_matched(self):
|
||||||
"""
|
|
||||||
On first creation, the match field is None. We need to populate it with
|
|
||||||
a dictionary containing the index names as keys and False as values.
|
|
||||||
"""
|
|
||||||
if self.object.match is None:
|
if self.object.match is None:
|
||||||
self.object.match = {}
|
self.object.match = {}
|
||||||
for index in self.parsed["index"]:
|
for index in self.parsed["index"]:
|
||||||
@ -191,9 +151,6 @@ class NotificationRuleData(object):
|
|||||||
def store_match(self, index, match):
|
def store_match(self, index, match):
|
||||||
"""
|
"""
|
||||||
Store a match result.
|
Store a match result.
|
||||||
Accepts None for the index to set all indices.
|
|
||||||
:param index: the index to store the match for, can be None
|
|
||||||
:param match: True or False, indicating if the rule matched
|
|
||||||
"""
|
"""
|
||||||
if self.object.match is None:
|
if self.object.match is None:
|
||||||
self.object.match = {}
|
self.object.match = {}
|
||||||
@ -211,8 +168,6 @@ class NotificationRuleData(object):
|
|||||||
def get_match(self, index=None):
|
def get_match(self, index=None):
|
||||||
"""
|
"""
|
||||||
Get a match result for an index.
|
Get a match result for an index.
|
||||||
If the index is None, it will return True if any index has a match.
|
|
||||||
:param index: the index to get the match for, can be None
|
|
||||||
"""
|
"""
|
||||||
if self.object.match is None:
|
if self.object.match is None:
|
||||||
self.object.match = {}
|
self.object.match = {}
|
||||||
@ -236,8 +191,6 @@ class NotificationRuleData(object):
|
|||||||
{"avg_sentiment": {"value": 0.6}}
|
{"avg_sentiment": {"value": 0.6}}
|
||||||
It's matched already, we just need to format it like so:
|
It's matched already, we just need to format it like so:
|
||||||
{"avg_sentiment": "0.06>0.5"}
|
{"avg_sentiment": "0.06>0.5"}
|
||||||
:param aggs: the aggregations to format
|
|
||||||
:return: the formatted aggregations
|
|
||||||
"""
|
"""
|
||||||
new_aggs = {}
|
new_aggs = {}
|
||||||
for agg_name, agg in aggs.items():
|
for agg_name, agg in aggs.items():
|
||||||
@ -250,11 +203,6 @@ class NotificationRuleData(object):
|
|||||||
def rule_matched(self, index, message, aggs):
|
def rule_matched(self, index, message, aggs):
|
||||||
"""
|
"""
|
||||||
A rule has matched.
|
A rule has matched.
|
||||||
If the previous run did not match, send a notification after formatting
|
|
||||||
the aggregations.
|
|
||||||
:param index: the index the rule matched on
|
|
||||||
:param message: the message object that matched
|
|
||||||
:param aggs: the aggregations that matched
|
|
||||||
"""
|
"""
|
||||||
current_match = self.get_match(index)
|
current_match = self.get_match(index)
|
||||||
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
||||||
@ -267,10 +215,6 @@ class NotificationRuleData(object):
|
|||||||
def rule_no_match(self, index=None):
|
def rule_no_match(self, index=None):
|
||||||
"""
|
"""
|
||||||
A rule has not matched.
|
A rule has not matched.
|
||||||
If the previous run did match, send a notification if configured to notify
|
|
||||||
for empty matches.
|
|
||||||
:param index: the index the rule did not match on, can be None
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
current_match = self.get_match(index)
|
current_match = self.get_match(index)
|
||||||
log.debug(f"Rule not matched: {index} - current match: {current_match}")
|
log.debug(f"Rule not matched: {index} - current match: {current_match}")
|
||||||
@ -283,19 +227,14 @@ class NotificationRuleData(object):
|
|||||||
async def run_schedule(self):
|
async def run_schedule(self):
|
||||||
"""
|
"""
|
||||||
Run the schedule query.
|
Run the schedule query.
|
||||||
Get the results from the database, and check if the rule has matched.
|
|
||||||
Check if all of the required aggregations have matched.
|
|
||||||
"""
|
"""
|
||||||
response = await self.db.schedule_query_results(self)
|
response = await self.db.schedule_query_results(self)
|
||||||
if not response:
|
if not response:
|
||||||
# No results in the result_map
|
|
||||||
self.rule_no_match()
|
self.rule_no_match()
|
||||||
for index, (aggs, results) in response.items():
|
for index, (aggs, results) in response.items():
|
||||||
if not results:
|
if not results:
|
||||||
# Falsy results, no matches
|
|
||||||
self.rule_not_matched(index)
|
self.rule_not_matched(index)
|
||||||
|
|
||||||
# Add the match values of all aggregations to a list
|
|
||||||
aggs_for_index = []
|
aggs_for_index = []
|
||||||
for agg_name in self.aggs.keys():
|
for agg_name in self.aggs.keys():
|
||||||
if agg_name in aggs:
|
if agg_name in aggs:
|
||||||
@ -305,18 +244,15 @@ class NotificationRuleData(object):
|
|||||||
# All required aggs are present
|
# All required aggs are present
|
||||||
if len(aggs_for_index) == len(self.aggs.keys()):
|
if len(aggs_for_index) == len(self.aggs.keys()):
|
||||||
if all(aggs_for_index):
|
if all(aggs_for_index):
|
||||||
# All aggs have matched
|
# Ensure we only send notifications when the previous run
|
||||||
|
# did not return any matches
|
||||||
self.rule_matched(index, results[: self.object.amount], aggs)
|
self.rule_matched(index, results[: self.object.amount], aggs)
|
||||||
continue
|
continue
|
||||||
# Default branch, since the happy path has a continue keyword
|
self.rule_not_matched(index)
|
||||||
self.rule_no_match(index)
|
|
||||||
|
|
||||||
def test_schedule(self):
|
def test_schedule(self):
|
||||||
"""
|
"""
|
||||||
Test the schedule query to ensure it is valid.
|
Test the schedule query to ensure it is valid.
|
||||||
Run the query with the async_to_sync helper so we can call it from
|
|
||||||
a form.
|
|
||||||
Raises an exception if the query is invalid.
|
|
||||||
"""
|
"""
|
||||||
if self.db:
|
if self.db:
|
||||||
sync_schedule = async_to_sync(self.db.schedule_query_results)
|
sync_schedule = async_to_sync(self.db.schedule_query_results)
|
||||||
@ -330,7 +266,6 @@ class NotificationRuleData(object):
|
|||||||
tokens: can be list, it will ensure the message matches any token.
|
tokens: can be list, it will ensure the message matches any token.
|
||||||
msg: can be a list, it will ensure the message contains any msg.
|
msg: can be a list, it will ensure the message contains any msg.
|
||||||
No other fields can be lists containing more than one item.
|
No other fields can be lists containing more than one item.
|
||||||
:raises RuleParseError: if the fields are invalid
|
|
||||||
"""
|
"""
|
||||||
is_schedule = self.is_schedule
|
is_schedule = self.is_schedule
|
||||||
|
|
||||||
@ -386,10 +321,6 @@ class NotificationRuleData(object):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def is_schedule(self):
|
def is_schedule(self):
|
||||||
"""
|
|
||||||
Check if the rule is a schedule rule.
|
|
||||||
:return: True if the rule is a schedule rule, False otherwise
|
|
||||||
"""
|
|
||||||
if "interval" in self.cleaned_data:
|
if "interval" in self.cleaned_data:
|
||||||
if self.cleaned_data["interval"] != 0:
|
if self.cleaned_data["interval"] != 0:
|
||||||
return True
|
return True
|
||||||
@ -397,8 +328,7 @@ class NotificationRuleData(object):
|
|||||||
|
|
||||||
def ensure_list(self):
|
def ensure_list(self):
|
||||||
"""
|
"""
|
||||||
Ensure all values in the data field are lists.
|
Ensure all values are lists.
|
||||||
Convert all strings to lists with one item.
|
|
||||||
"""
|
"""
|
||||||
for field, value in self.parsed.items():
|
for field, value in self.parsed.items():
|
||||||
if not isinstance(value, list):
|
if not isinstance(value, list):
|
||||||
@ -407,7 +337,6 @@ class NotificationRuleData(object):
|
|||||||
def validate_user_permissions(self):
|
def validate_user_permissions(self):
|
||||||
"""
|
"""
|
||||||
Ensure the user can use notification rules.
|
Ensure the user can use notification rules.
|
||||||
:raises RuleParseError: if the user does not have permission
|
|
||||||
"""
|
"""
|
||||||
if not self.user.has_perm("core.use_rules"):
|
if not self.user.has_perm("core.use_rules"):
|
||||||
raise RuleParseError("User does not have permission to use rules", "data")
|
raise RuleParseError("User does not have permission to use rules", "data")
|
||||||
@ -416,12 +345,6 @@ class NotificationRuleData(object):
|
|||||||
"""
|
"""
|
||||||
Validate the interval and window fields.
|
Validate the interval and window fields.
|
||||||
Prohibit window being specified with an ondemand interval.
|
Prohibit window being specified with an ondemand interval.
|
||||||
Prohibit window not being specified with a non-ondemand interval.
|
|
||||||
Prohibit amount being specified with an on-demand interval.
|
|
||||||
Prohibut amount not being specified with a non-ondemand interval.
|
|
||||||
Validate window field.
|
|
||||||
Validate window unit and enforce maximum.
|
|
||||||
:raises RuleParseError: if the fields are invalid
|
|
||||||
"""
|
"""
|
||||||
interval = self.cleaned_data.get("interval")
|
interval = self.cleaned_data.get("interval")
|
||||||
window = self.cleaned_data.get("window")
|
window = self.cleaned_data.get("window")
|
||||||
@ -480,9 +403,6 @@ class NotificationRuleData(object):
|
|||||||
def validate_permissions(self):
|
def validate_permissions(self):
|
||||||
"""
|
"""
|
||||||
Validate permissions for the source and index variables.
|
Validate permissions for the source and index variables.
|
||||||
Also set the default values for the user if not present.
|
|
||||||
Stores the default or expanded values in the parsed field.
|
|
||||||
:raises QueryError: if the user does not have permission to use the source
|
|
||||||
"""
|
"""
|
||||||
if "index" in self.parsed:
|
if "index" in self.parsed:
|
||||||
index = self.parsed["index"]
|
index = self.parsed["index"]
|
||||||
@ -511,7 +431,6 @@ class NotificationRuleData(object):
|
|||||||
def parse_data(self):
|
def parse_data(self):
|
||||||
"""
|
"""
|
||||||
Parse the data in the text field to YAML.
|
Parse the data in the text field to YAML.
|
||||||
:raises RuleParseError: if the data is invalid
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self.parsed = load(self.data, Loader=Loader)
|
self.parsed = load(self.data, Loader=Loader)
|
||||||
@ -519,13 +438,7 @@ class NotificationRuleData(object):
|
|||||||
raise RuleParseError("data", f"Invalid YAML: {e}")
|
raise RuleParseError("data", f"Invalid YAML: {e}")
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
"""
|
|
||||||
Get a YAML representation of the data field of the rule.
|
|
||||||
"""
|
|
||||||
return dump(self.parsed, Dumper=Dumper)
|
return dump(self.parsed, Dumper=Dumper)
|
||||||
|
|
||||||
def get_data(self):
|
def get_data(self):
|
||||||
"""
|
|
||||||
Return the data field as a dictionary.
|
|
||||||
"""
|
|
||||||
return self.parsed
|
return self.parsed
|
||||||
|
@ -19,7 +19,6 @@ def process_rules(data):
|
|||||||
# up a NotificationRuleData object
|
# up a NotificationRuleData object
|
||||||
parsed_rule = rule.parse()
|
parsed_rule = rule.parse()
|
||||||
matched = {}
|
matched = {}
|
||||||
# Rule is invalid, this shouldn't happen
|
|
||||||
if "index" not in parsed_rule:
|
if "index" not in parsed_rule:
|
||||||
continue
|
continue
|
||||||
if "source" not in parsed_rule:
|
if "source" not in parsed_rule:
|
||||||
@ -31,10 +30,8 @@ def process_rules(data):
|
|||||||
# if not type(rule_source) == list:
|
# if not type(rule_source) == list:
|
||||||
# rule_source = [rule_source]
|
# rule_source = [rule_source]
|
||||||
if index not in rule_index:
|
if index not in rule_index:
|
||||||
# We don't care about this index, go to the next one
|
|
||||||
continue
|
continue
|
||||||
if message["src"] not in rule_source:
|
if message["src"] not in rule_source:
|
||||||
# We don't care about this source, go to the next one
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
matched["index"] = index
|
matched["index"] = index
|
||||||
@ -46,11 +43,8 @@ def process_rules(data):
|
|||||||
# if not type(value) == list:
|
# if not type(value) == list:
|
||||||
# value = [value]
|
# value = [value]
|
||||||
if field == "src":
|
if field == "src":
|
||||||
# We already checked this
|
|
||||||
continue
|
continue
|
||||||
if field == "tokens":
|
if field == "tokens":
|
||||||
# Check if tokens are in the rule
|
|
||||||
# We only check if *at least one* token matches
|
|
||||||
for token in value:
|
for token in value:
|
||||||
if "tokens" in message:
|
if "tokens" in message:
|
||||||
if token in message["tokens"]:
|
if token in message["tokens"]:
|
||||||
@ -61,8 +55,8 @@ def process_rules(data):
|
|||||||
# Continue to next field
|
# Continue to next field
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Allow partial matches for msg
|
||||||
if field == "msg":
|
if field == "msg":
|
||||||
# Allow partial matches for msg
|
|
||||||
for msg in value:
|
for msg in value:
|
||||||
if "msg" in message:
|
if "msg" in message:
|
||||||
if msg.lower() in message["msg"].lower():
|
if msg.lower() in message["msg"].lower():
|
||||||
@ -73,10 +67,8 @@ def process_rules(data):
|
|||||||
# Continue to next field
|
# Continue to next field
|
||||||
continue
|
continue
|
||||||
if field in message and message[field] in value:
|
if field in message and message[field] in value:
|
||||||
# Do exact matches for all other fields
|
|
||||||
matched_field_number += 1
|
matched_field_number += 1
|
||||||
matched[field] = message[field]
|
matched[field] = message[field]
|
||||||
# Subtract 2, 1 for source and 1 for index
|
|
||||||
if matched_field_number == rule_field_length - 2:
|
if matched_field_number == rule_field_length - 2:
|
||||||
rule_notify(rule, index, message, matched)
|
rule_notify(rule, index, message, matched)
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ from django.core.management.base import BaseCommand
|
|||||||
|
|
||||||
from core.db.storage import db
|
from core.db.storage import db
|
||||||
from core.lib.parsing import QueryError
|
from core.lib.parsing import QueryError
|
||||||
from core.lib.rules import NotificationRuleData, RuleParseError
|
from core.lib.rules import NotificationRuleData
|
||||||
from core.models import NotificationRule
|
from core.models import NotificationRule
|
||||||
from core.util import logs
|
from core.util import logs
|
||||||
|
|
||||||
@ -31,8 +31,6 @@ async def job(interval_seconds):
|
|||||||
# results = await db.schedule_query_results(rule.user, rule)
|
# results = await db.schedule_query_results(rule.user, rule)
|
||||||
except QueryError as e:
|
except QueryError as e:
|
||||||
log.error(f"Error running rule {rule}: {e}")
|
log.error(f"Error running rule {rule}: {e}")
|
||||||
except RuleParseError as e:
|
|
||||||
log.error(f"Error parsing rule {rule}: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
|
Loading…
Reference in New Issue
Block a user