You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
neptune/core/management/commands/processing.py

109 lines
4.7 KiB
Python

import msgpack
from asgiref.sync import async_to_sync
from django.core.management.base import BaseCommand
from redis import StrictRedis
from core.db.storage import db
from core.lib.rules import NotificationRuleData
from core.models import NotificationRule
from core.util import logs
log = logs.get_logger("processing")
def process_rules(data):
all_rules = NotificationRule.objects.filter(enabled=True, interval=0)
for index, index_messages in data.items():
for message in index_messages:
for rule in all_rules:
# Quicker helper to get the data without spinning
# up a NotificationRuleData object
parsed_rule = rule.parse()
matched = {}
# Rule is invalid, this shouldn't happen
if "index" not in parsed_rule:
continue
if "source" not in parsed_rule:
continue
rule_index = parsed_rule["index"]
rule_source = parsed_rule["source"]
# if not type(rule_index) == list:
# rule_index = [rule_index]
# if not type(rule_source) == list:
# rule_source = [rule_source]
if index not in rule_index:
# We don't care about this index, go to the next one
continue
if message["src"] not in rule_source:
# We don't care about this source, go to the next one
continue
matched["index"] = index
matched["source"] = message["src"]
rule_field_length = len(parsed_rule.keys())
matched_field_number = 0
for field, value in parsed_rule.items():
# if not type(value) == list:
# value = [value]
if field == "src":
# We already checked this
continue
if field == "tokens":
# Check if tokens are in the rule
# We only check if *at least one* token matches
for token in value:
if "tokens" in message:
if token in message["tokens"]:
matched_field_number += 1
matched[field] = token
# Break out of the token matching loop
break
# Continue to next field
continue
if field == "msg":
# Allow partial matches for msg
for msg in value:
if "msg" in message:
if msg.lower() in message["msg"].lower():
matched_field_number += 1
matched[field] = msg
# Break out of the msg matching loop
break
# Continue to next field
continue
if field in message and message[field] in value:
# Do exact matches for all other fields
matched_field_number += 1
matched[field] = message[field]
# Subtract 2, 1 for source and 1 for index
if matched_field_number == rule_field_length - 2:
meta = {"matched": matched, "total_hits": 1}
# Parse the rule, we saved some work above to avoid doing this,
# but it makes delivering messages significantly easier as we ca
# use the same code as for scheduling.
rule_data_object = NotificationRuleData(rule.user, rule, db=db)
# rule_notify(rule, index, message, meta=meta)
print("ABOUT TO RUN ASYNC TO SYNC")
rule_matched = async_to_sync(rule_data_object.rule_matched)
rule_matched(index, message, meta=meta, mode="ondemand")
class Command(BaseCommand):
def handle(self, *args, **options):
r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0)
p = r.pubsub()
p.psubscribe("messages")
for message in p.listen():
if message:
if message["channel"] == b"messages":
data = message["data"]
try:
unpacked = msgpack.unpackb(data, raw=False)
except TypeError:
continue
process_rules(unpacked)