@ -28,15 +28,15 @@ class RuleParseError(Exception):
self . field = field
def rule_matched( rule , message , matched ) :
title = f " Rule { rule . name } matched "
notification_settings = rule . get_notification_settings ( )
cast = {
" title " : title ,
* * notification_settings ,
}
if rule. service == " ntfy " :
def format_ntfy( * * kwargs ) :
"" "
Format a message for ntfy .
"""
rule = kwargs . get ( " rule " )
index = kwargs . get ( " index " )
message = kwargs . get ( " message " )
matched = kwargs . get ( " matched " )
if message :
# Dump the message in YAML for readability
messages_formatted = " "
if isinstance ( message , list ) :
@ -47,25 +47,70 @@ def rule_matched(rule, message, matched):
messages_formatted + = " \n "
else :
messages_formatted = dump ( message , Dumper = Dumper , default_flow_style = False )
else :
messages_formatted = " "
if matched :
matched = " , " . join ( [ f " { k } : { v } " for k , v in matched . items ( ) ] )
else :
matched = " "
notify_message = f " { rule . name } on { index } : { matched } \n { messages_formatted } "
notify_message = notify_message . encode ( " utf-8 " , " replace " )
return notify_message
def format_webhook ( * * kwargs ) :
"""
Format a message for a webhook .
"""
rule = kwargs . get ( " rule " )
index = kwargs . get ( " index " )
message = kwargs . get ( " message " )
matched = kwargs . get ( " matched " )
notification_settings = kwargs . get ( " notification_settings " )
notify_message = {
" rule_id " : rule . id ,
" rule_name " : rule . name ,
" match " : matched ,
" index " : index ,
" data " : message ,
}
if " priority " in notification_settings :
notify_message [ " priority " ] = notification_settings [ " priority " ]
if " topic " in notification_settings :
notify_message [ " topic " ] = notification_settings [ " topic " ]
notify_message = orjson . dumps ( notify_message )
return notify_message
notify_message = f " { rule . name } match: { matched } \n { messages_formatted } "
notify_message = notify_message . encode ( " utf-8 " , " replace " )
def rule_notify ( rule , index , message , matched ) :
if message :
word = " match "
else :
word = " no match "
title = f " Rule { rule . name } { word } on { index } "
notification_settings = rule . get_notification_settings ( )
if not notification_settings :
return
cast = {
" title " : title ,
" user " : rule . user ,
" rule " : rule ,
" index " : index ,
" message " : message ,
" matched " : matched ,
" notification_settings " : notification_settings ,
}
if rule . service == " ntfy " :
cast [ " msg " ] = format_ntfy ( * * cast )
elif rule . service == " webhook " :
notify_message = {
" rule_id " : rule . id ,
" rule_name " : rule . name ,
" match " : matched ,
" data " : message ,
}
if " priority " in notification_settings :
notify_message [ " priority " ] = notification_settings [ " priority " ]
if " topic " in notification_settings :
notify_message [ " topic " ] = notification_settings [ " topic " ]
notify_message = orjson . dumps ( notify_message )
cast [ " msg " ] = format_webhook ( * * cast )
sendmsg ( rule . user , notify_message , * * cast )
sendmsg ( * * cast )
class NotificationRuleData ( object ) :
@ -101,11 +146,15 @@ class NotificationRuleData(object):
if not isinstance ( self . object . match , dict ) :
self . object . match = { }
self . object . match [ index ] = match
if index is None :
for index_iter in self . parsed [ " index " ] :
self . object . match [ index_iter ] = match
else :
self . object . match [ index ] = match
self . object . save ( )
log . debug ( f " Stored match: { index } - { match } " )
def get_match ( self , index ) :
def get_match ( self , index = None ) :
"""
Get a match result for an index .
"""
@ -114,6 +163,10 @@ class NotificationRuleData(object):
if not isinstance ( self . object . match , dict ) :
return None
if index is None :
# Check if we have any matches on all indices
return any ( self . object . match . values ( ) )
return self . object . match . get ( index )
def format_aggs ( self , aggs ) :
@ -134,14 +187,38 @@ class NotificationRuleData(object):
return new_aggs
def rule_matched ( self , index , message , aggs ) :
"""
A rule has matched .
"""
current_match = self . get_match ( index )
if current_match is False :
# Matched now, but not before
formatted_aggs = self . format_aggs ( aggs )
rule_notify ( self . object , index , message , formatted_aggs )
self . store_match ( index , True )
def rule_no_match ( self , index = None ) :
"""
A rule has not matched .
"""
current_match = self . get_match ( index )
if current_match is True :
# Matched before, but not now
if self . object . send_empty :
rule_notify ( self . object , index , " no_match " , None )
self . store_match ( index , False )
async def run_schedule ( self ) :
"""
Run the schedule query .
"""
response = await self . db . schedule_query_results ( self )
if not response :
self . rule_no_match ( )
for index , ( aggs , results ) in response . items ( ) :
if not results :
self . store_match ( index , False )
self . rul e_not_ matched ( index )
aggs_for_index = [ ]
for agg_name in self . aggs . keys ( ) :
@ -154,15 +231,9 @@ class NotificationRuleData(object):
if all ( aggs_for_index ) :
# Ensure we only send notifications when the previous run
# did not return any matches
current_match = self . get_match ( index )
if current_match is False :
formatted_aggs = self . format_aggs ( aggs )
rule_matched (
self . object , results [ : self . object . amount ] , formatted_aggs
)
self . store_match ( index , True )
self . rule_matched ( index , results [ : self . object . amount ] , aggs )
continue
self . sto re_match( index , False )
self . rule_not_matched ( index )
def test_schedule ( self ) :
"""